Kafka Partition分发策略
  TEZNKK3IfmPf 2023年11月13日 60 0

今天突然想起一个问题,当producer往kafka写数据的时候,如果不指定Patition,也没有指定Key的话,那么它是怎么做的负载均衡?

通过查看kafka源码,发现Kafka Java客户端有默认的partition分配机制。

实现如下:

 /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

从源码可以看出,首先获取topic的所有Patition,如果客户端不指定Patition,也没有指定Key的话,使用自增长的数字取余数的方式实现指定的Partition。这样Kafka将平均的向Partition中生产数据。

如果想要控制发送的partition,则有两种方式,一种是指定partition,另一种就是根据Key自己写算法。继承Partitioner接口,实现其partition方法。并且配置启动参数

props.put("partitioner.class","TestPartitioner")
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月13日   61   0   0 kafka
  TEZNKK3IfmPf   2023年11月14日   47   0   0 kafka
  TEZNKK3IfmPf   2023年11月13日   31   0   0 扩容kafka
  TEZNKK3IfmPf   2023年11月13日   42   0   0 kafka
  TEZNKK3IfmPf   2023年11月13日   53   0   0 javakafka数据
TEZNKK3IfmPf