Kafka:Producer自定义Partition负载均衡
  bp5SCceIOyYK 2023年11月02日 59 0


​pom.xml​​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.kaven</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>

测试代码:

package com.kaven.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerTest {

public static void main(String[] args) throws ExecutionException, InterruptedException {
send("new-topic-user");
}

public static void send(String name) throws ExecutionException, InterruptedException {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
"key-" + i,
"value-" + i
);
// 异步发送并回调
producer.send(producerRecord, (metadata, exception) -> {
if(exception == null) {
System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset());
}
else {
exception.printStackTrace();
}
});
}
// 要关闭Producer实例
producer.close();
}

public static Producer<String, String> createProducer() {
// Producer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.7:9092");
// KEY的序列化器类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE的序列化器类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 分区器类
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kaven.kafka.producer.PartitionLoadBalancer");

return new KafkaProducer<>(properties);
}
}

​Producer​​​自定义​​Partition​​​负载均衡需要实现​​org.apache.kafka.clients.producer.Partitioner​​接口。

package com.kaven.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class PartitionLoadBalancer implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int num = Integer.parseInt(((String) key).split("-")[1]) % cluster.partitionCountForTopic(topic);
System.out.println(key + " : " + num);
return num;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

​PartitionLoadBalancer​​​根据消息​​key​​的最后一位数字(这里根据自己的需求来设计)来选择分区。

输出:

key-0 : 0
key-1 : 1
key-2 : 2
key-3 : 0
key-4 : 1
key-5 : 2
key-6 : 0
key-7 : 1
key-8 : 2
key-9 : 0
partition: 2 offset: 37
partition: 2 offset: 38
partition: 2 offset: 39
partition: 0 offset: 42
partition: 0 offset: 43
partition: 0 offset: 44
partition: 0 offset: 45
partition: 1 offset: 50
partition: 1 offset: 51
partition: 1 offset: 52

输出符合预期,​​Producer​​​自定义​​Partition​​负载均衡就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  M5nxXzKbD3Q7   2023年11月12日   22   0   0 xmlmavenjar
  ehrZuhofWJiC   2024年05月17日   61   0   0 mavenIDEA
bp5SCceIOyYK