Kafka API操作
  ILwIY8Berufg 2023年11月02日 71 0

首先导入maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>

然后将consumer.properties与producer.properties文件放在resources文件夹下,可以去自己机器拿,也可以copy下面的,目录结构如下

Kafka API操作_kafka

producer.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=blog.partitioner.RoundRobinPartitioner
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
linger.ms=5000
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
batch.size=1024
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
buffer.memory=10240
# serializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • 创建producer时需要指定的配置信息(部分参数介绍)
  • bootstrap.servers kafka的服务器
  • key.serializer key的序列化器
  • value.serializer #value的序列化器
  • acks=[0|-1|1|all] 消息确认机制
  • 0:不做确认,直管发送消息即可
  • -1:|all 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认
  • 1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
  • batch.size 每个分区内的用户缓存未发送record记录的空间大小
  • linger.ms 不管缓冲区是否被占满,延迟发送request的时间
  • buffer.memory 控制的是一个producer中的所有的缓存空间
  • retries 发送消息失败之后的重试次数

consumer.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9093
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest
## deserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

首先保证集群的kafka为开启状态!

1.kafka生产者的api操作

配置好producer.properties文件后创建一个生产者类

package blog.creatures;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @Author Daniel
 * @Description Kafka生产者
 **/
public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        //properties.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        properties.load(KafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
        //key代表topic中发送的每一条消息的key的类型,可以为null,value代表消息的value的类型
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer(properties);
        int start = 0;
        int end = start + 10;
        for (int i = start; i < end; i++) {
            //发送数据
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("hadoop", "hadoop-" + i, "This is my msg");
            producer.send(record);
        }
        Thread.sleep(10000);
        producer.close();
    }
}

先在集群上打开一个消费者

kafka-console-consumer.sh \
--topic hadoop \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092

然后run代码

可以看到,延迟了五秒后(lineger.ms的设置),一次性将数据发送给了集群

Kafka API操作_hadoop_02

2.kafka消费者的api操作

配置好consumer.properties文件后创建一个消费者类

package blog.creatures;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Author Daniel
 * @Description Kafka消费者
 **/
public class KafkaConsumer {
    public static void main(String[] args) throws IOException {
        Properties conf = new Properties();
        conf.load(KafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        Consumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(conf);

        while (true) {
            //消费指定的topic
            consumer.subscribe(Arrays.asList("hadoop"));
            //消费数据
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : consumerRecords) {
                //获取key,value,偏移量,分区信息,时间戳
                String key = record.key();
                String value = record.value();
                long offset = record.offset();
                int partition = record.partition();
                long timestamp = record.timestamp();
                System.out.printf("key: %s\tvalue: %s\toffset: %d\tpartition: %d\ttimestamp: %d\n",
                        key, value, offset, partition, timestamp);
            }
        }
        //consumer.close();
    }
}

启动Consumer,然后再启动Producer,可以看到Consumer类的控制台打印出了在Producer上生产的数据

Kafka API操作_hadoop_03

3.kafka分区的api操作

每一条产生的数据中都有topic名称、可选的partition分区编号以及一对可选的key和value

  • 如果指定的partition,那么直接进入该partition
  • 如果没有指定partition,但是指定了key,使用key的hash选择partition
  • 如果既没有指定partition,也没有指定key,使用轮询的方式进入partition

自定义分区

3.1随机分区
package blog.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

/**
 * @Author Daniel
 * @Description Kafka随机分区
 **/
public class RandomPartitioner implements Partitioner {
    private Random random = new Random();

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer partitionCount = cluster.partitionCountForTopic(topic);//返回当前topic的partition个数
        int partition = random.nextInt(partitionCount);//采用随机数来分区
        System.out.println("key: " + key + "\tpartition: " + partition);
        return partition;
    }
}

然后在producer.properties文件中设置参数partitioner.class=blog.partitioner.RandomPartitioner(全类名)

然后再run一下producer

Kafka API操作_kafka_04

可以看到0、2、3、7进入了1,可见是没有顺序的

3.2hash分区
package blog.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @Author Daniel
 * @Description Kafka hash分区
 **/
public class HashPartitioner implements Partitioner {
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer partCount = cluster.partitionCountForTopic(topic);
        //分区算法
        int partition = Math.abs(key.hashCode()) % partCount;
        System.out.println("key: " + key + "partition: " + partition);
        return partition;
    }
}

然后在producer.properties文件中设置参数partitioner.class=blog.partitioner.HashPartitioner(全类名)

然后再run一下producer

Kafka API操作_apache_05

我这里的分区数为4,所以0%4、4%4/、8%4都进入了分区0,也就是说余数一样的是在一个分区

3.3轮询分区
package blog.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author Daniel
 * @Description Kafka 轮询分区
 **/
public class RoundRobinPartitioner implements Partitioner {
    //定义一个原子计数器
    private AtomicInteger count = new AtomicInteger();

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer parCount = cluster.partitionCountForTopic(topic);
        //轮流着分区
        int partition = count.getAndIncrement() % parCount;
        System.out.println("key: " + key + "\tpartition: " + partition);
        return partition;
    }
}

然后在producer.properties文件中设置参数partitioner.class=blog.partitioner.RoundRobinPartitioner(全类名)

然后再run一下producer

Kafka API操作_apache_06

可以看到是按顺序进行分区的,0进0、1进1......

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
ILwIY8Berufg
最新推荐 更多

2024-05-31