kafka 客户端之producer API发送消息(自定义负载均衡实现)与负载均衡调用源码分析
  707fzl7bQVmn 2023年11月02日 49 0


背景:​​kafka 客户端之producer API发送消息以及简单源码分析​​已经介绍了producer的异步发送和异步回调发送消息的基本使用,但是都是使用内置的负载均衡策略。kafka的负载均衡是在客户端实现的。

自定义负载均衡实现

在某些特殊的业务场景下我们经常会有自定义负载均衡算法的需求,在Kafka中可以通过实现Partitioner接口来自定义Partition负载均衡器。

kafka自带的有三种实现

kafka 客户端之producer API发送消息(自定义负载均衡实现)与负载均衡调用源码分析_kafka

  • DefaultPartitioner:如果record中指定了分区,则使用它;如果未指定分区但存在key,则根据key的hash选择分区;如果不存在分区或key,则选择在批处理已满时更改的sticky partition。
  • UniformStickyPartitioner:如果record中指定了分区,则使用它;否则选择batch已满时更改的sticky partition。 注意:与 DefaultPartitioner 相比,record key不用作此分区器中分区策略的一部分。 具有相同键的record不保证发送到同一个分区。 有关sticky partition的详细信息,请参阅 KIP-480
  • RoundRobinPartitioner: “循环”分区器 当用户希望将写入平均分配到所有分区时,可以使用此分区策略。这是与record key hash无关的行为

自己实现Partitioner接口

public class MyPartition implements Partitioner {

/**
* 自定义策略,取随机数
**/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

int partitionsNum = cluster.partitionsForTopic(topic).size();
// 使用 ThreadLocalRandom 产生随机数
return ThreadLocalRandom.current().nextInt(partitionsNum);

}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}

}

new 一个producer实例的时候,把自己的负载均衡实现类的全路径名导入进去。

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xt.kafkademo.producer.MyPartition");

Producer异步发送消息(带回调函数和自定义Partition负载均衡)

/*
Producer异步发送带回调函数和Partition负载均衡
*/
public static void producerSendWithCallbackAndPartition(Producer<String,String> producer){


// 消息对象 - ProducerRecoder
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
e.printStackTrace();
}else{
System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}

}
});

}

// 所有的通道打开都需要关闭
producer.close();
}

完整代码

public class ProducerSample {

private final static String TOPIC_NAME="xt";

/**
* 创建Producer实例
*/
public static Producer<String, String> createProducer(boolean mypartition) {
Properties properties = new Properties();
//配置文件里面的变量都是静态final类型的,并且都有默认的值
//用于建立与 kafka 集群连接的 host/port
//继承的hashtable,保证了线程安全
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务器IP:9092");
/**
* producer 需要 server 接收到数据之后发出的确认接收的信号,此项配置就是指 procuder需要多少个这样的确认信号。此配置实际上代表
* 了数据备份的可用性。以下设置为常用选项:
* (1)acks=0: 设置为 0 表示 producer 不需要等待任何确认收到的信息。副本将立即加到socket buffer 并认为已经发送。没有任何保
* 障可以保证此种情况下 server 已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的 offset 会总是设置为-1;
* (2)acks=1: 这意味着至少要等待 leader已经成功将数据写入本地 log,但是并没有等待所有 follower 是否成功写入。这种情况下,如
* 果 follower 没有成功备份数据,而此时 leader又挂掉,则消息会丢失。
* (3)acks=all: 这意味着 leader 需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
* (4)其他的设置,例如 acks=2 也是可以的,这将需要给定的 acks 数量,但是这种策略一般很少用
**/
properties.put(ProducerConfig.ACKS_CONFIG,"all");
/**
设置大于 0 的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许
重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个 partition,则第一个消息失败第二个发送成功,则第二条消息会比第一
条消息出现要早
**/
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
/**
* producer 将试图批处理消息记录,以减少请求次数。这将改善 client 与 server 之间的性能。这项配置控制默认的批量处理消息字节数。
* 不会试图处理大于这个字节数的消息字节数。发送到 brokers 的请求将包含多个批量处理,其中会包含对每个 partition 的一个请求。
* 较小的批量处理数值比较少用,并且可能降低吞吐量(0 则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特
* 定批量处理数值的内存大小
**/
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
/**
* producer 组将会汇总任何在请求与发送之间到达的消息记录一个单独批量的请求。通常来说,这只有在记录产生速度大于发送速度的时候才
* 能发生。然而,在某些条件下,客户端将希望降低请求的数量,甚至降低到中等负载一下。这项设置将通过增加小的延迟来完成--即,不是立即
* 发送一条记录,producer 将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理。这可以认为是 TCP 种 Nagle 的算
* 法类似。这项设置设定了批量处理的更高的延迟边界:一旦我们获得某个 partition 的batch.size,他将会立即发送而不顾这项设置,
* 然而如果我们获得消息字节数比这项设置要小的多,我们需要“linger”特定的时间以获取更多的消息。 这个设置默认为 0,即没有延迟。设
* 定 linger.ms=5,例如,将会减少请求数目,但是同时会增加 5ms 的延迟
**/
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
/**
* producer 可以用来缓存数据的内存大小。如果数据产生速度大于向 broker 发送的速度,将会耗尽这个缓存空间,producer
* 会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和 producer 能够使用的总内存相关,但并不是一个
* 硬性的限制,因为不是producer 使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些
* 用于维护请求当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
**/
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
/**
* 该配置控制 KafkaProducer's send(),partitionsFor(),inittransaction (),sendOffsetsToTransaction(),commitTransaction() "
* 和abortTransaction()方法将阻塞。对于send(),此超时限制了获取元数据和分配缓冲区的总等待时间"
**/
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");

//将消息发送到kafka server, 所以肯定需要用到序列化的操作 我们这里发送的消息是string类型的,所以使用string的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
if(mypartition == true){
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.xt.kafkademo.producer.MyPartition");
}
return new KafkaProducer<>(properties);

}



public static void main(String[] args) throws ExecutionException, InterruptedException {

// Producer的主对象
Producer<String,String> producer = ProducerSample.createProducer(true);

// Producer异步发送带回调函数和Partition负载均衡
producerSendWithCallbackAndPartition(producer);

}





/*
Producer异步发送带回调函数和Partition负载均衡
*/
public static void producerSendWithCallbackAndPartition(Producer<String,String> producer){


// 消息对象 - ProducerRecoder
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
e.printStackTrace();
}else{
System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}

}
});

}

// 所有的通道打开都需要关闭
producer.close();
}

}

kafka是怎样调用我们的负载均衡实现类的

在自己的负载均衡类上的方法打断点

kafka 客户端之producer API发送消息(自定义负载均衡实现)与负载均衡调用源码分析_数据_02


然后debug运行自己的主程序

kafka 客户端之producer API发送消息(自定义负载均衡实现)与负载均衡调用源码分析_数据_03


我们可以发现是如下代码调用的我们自己的实现

KafkaProducer的 partition方法

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

而这个partitioner实例正是我们自己实现的那个负载均衡类的实例,他在KafkaProducer类中被定义为了私有 final 字段。

private final Partitioner partitioner;

而partitioner 是通过如下形式来实例化的

this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));

config是一个ProducerConfig实例,是在KafkaProducer的构造函数new出来的。如下代码所示。而下面的Map<String, Object> configs 正是由kafka封装的Utils.propsToMap(properties)转化而来,properties是我们主程序传入的配置。所以我们的设置是怎么传递的基本弄清楚了。

public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, valueSerializer)),
keySerializer, valueSerializer, null, null, null, Time.SYSTEM);
}

那config是怎么去new 我们自定义负载均衡实现类的呢?
先前我们说到了config是由我们传入的配置信息new 出来的,而他是一个ProducerConfig类的实例,他既然保存了我们的传入的配置信息,然后他再找出来是易于反掌

如下,已经拿到了我们自定义负载均衡类的全路径

kafka 客户端之producer API发送消息(自定义负载均衡实现)与负载均衡调用源码分析_数据_04


如下就是AbstractConfig类保存配置信息的字段,是一个Map,虽然是一个私有字段,但是ProducerConfig类的实例通过从AbstractConfig类继承的getClass方法(此方法是public的)拿到了配置信息

private final Map<String, Object> values;

拿到负载均衡类的全路径之后,kafka使用自己的封装的Utils来new 实例

private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs) {
if (klass == null)
return null;

Object o;
if (klass instanceof String) {
try {
o = Utils.newInstance((String) klass, t);
} catch (ClassNotFoundException e) {
throw new KafkaException("Class " + klass + " cannot be found", e);
}
} else if (klass instanceof Class<?>) {
o = Utils.newInstance((Class<?>) klass);
} else
throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class");
if (!t.isInstance(o))
throw new KafkaException(klass + " is not an instance of " + t.getName());
if (o instanceof Configurable)
((Configurable) o).configure(configPairs);

return t.cast(o);
}

进去Utils.newInstance一看,对的,还是使用的反射来创建的负载均衡实例

public static <T> T newInstance(Class<T> c) {
if (c == null)
throw new KafkaException("class cannot be null");
try {
return c.getDeclaredConstructor().newInstance();
} catch (NoSuchMethodException e) {
throw new KafkaException("Could not find a public no-argument constructor for " + c.getName(), e);
} catch (ReflectiveOperationException | RuntimeException e) {
throw new KafkaException("Could not instantiate class " + c.getName(), e);
}
}

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
707fzl7bQVmn