首先导入maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
</dependency>
然后将consumer.properties与producer.properties文件放在resources文件夹下,可以去自己机器拿,也可以copy下面的,目录结构如下
producer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=blog.partitioner.RoundRobinPartitioner
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
linger.ms=5000
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
batch.size=1024
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
buffer.memory=10240
# serializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
- 创建producer时需要指定的配置信息(部分参数介绍)
- bootstrap.servers kafka的服务器
- key.serializer key的序列化器
- value.serializer #value的序列化器
- acks=[0|-1|1|all] 消息确认机制
- 0:不做确认,直管发送消息即可
- -1:|all 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认
- 1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
- batch.size 每个分区内的用户缓存未发送record记录的空间大小
- linger.ms 不管缓冲区是否被占满,延迟发送request的时间
- buffer.memory 控制的是一个producer中的所有的缓存空间
- retries 发送消息失败之后的重试次数
consumer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9093
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest
## deserializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
首先保证集群的kafka为开启状态!
1.kafka生产者的api操作
配置好producer.properties文件后创建一个生产者类
package blog.creatures;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @Author Daniel
* @Description Kafka生产者
**/
public class KafkaProducer {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
//properties.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
properties.load(KafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
//key代表topic中发送的每一条消息的key的类型,可以为null,value代表消息的value的类型
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer(properties);
int start = 0;
int end = start + 10;
for (int i = start; i < end; i++) {
//发送数据
ProducerRecord<String, String> record = new ProducerRecord<String, String>("hadoop", "hadoop-" + i, "This is my msg");
producer.send(record);
}
Thread.sleep(10000);
producer.close();
}
}
先在集群上打开一个消费者
kafka-console-consumer.sh \
--topic hadoop \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092
然后run代码
可以看到,延迟了五秒后(lineger.ms的设置),一次性将数据发送给了集群
2.kafka消费者的api操作
配置好consumer.properties文件后创建一个消费者类
package blog.creatures;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
/**
* @Author Daniel
* @Description Kafka消费者
**/
public class KafkaConsumer {
public static void main(String[] args) throws IOException {
Properties conf = new Properties();
conf.load(KafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
Consumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(conf);
while (true) {
//消费指定的topic
consumer.subscribe(Arrays.asList("hadoop"));
//消费数据
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<String, String> record : consumerRecords) {
//获取key,value,偏移量,分区信息,时间戳
String key = record.key();
String value = record.value();
long offset = record.offset();
int partition = record.partition();
long timestamp = record.timestamp();
System.out.printf("key: %s\tvalue: %s\toffset: %d\tpartition: %d\ttimestamp: %d\n",
key, value, offset, partition, timestamp);
}
}
//consumer.close();
}
}
启动Consumer,然后再启动Producer,可以看到Consumer类的控制台打印出了在Producer上生产的数据
3.kafka分区的api操作
每一条产生的数据中都有topic名称、可选的partition分区编号以及一对可选的key和value
- 如果指定的partition,那么直接进入该partition
- 如果没有指定partition,但是指定了key,使用key的hash选择partition
- 如果既没有指定partition,也没有指定key,使用轮询的方式进入partition
自定义分区
3.1随机分区
package blog.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
/**
* @Author Daniel
* @Description Kafka随机分区
**/
public class RandomPartitioner implements Partitioner {
private Random random = new Random();
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer partitionCount = cluster.partitionCountForTopic(topic);//返回当前topic的partition个数
int partition = random.nextInt(partitionCount);//采用随机数来分区
System.out.println("key: " + key + "\tpartition: " + partition);
return partition;
}
}
然后在producer.properties文件中设置参数partitioner.class=blog.partitioner.RandomPartitioner(全类名)
然后再run一下producer
可以看到0、2、3、7进入了1,可见是没有顺序的
3.2hash分区
package blog.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @Author Daniel
* @Description Kafka hash分区
**/
public class HashPartitioner implements Partitioner {
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer partCount = cluster.partitionCountForTopic(topic);
//分区算法
int partition = Math.abs(key.hashCode()) % partCount;
System.out.println("key: " + key + "partition: " + partition);
return partition;
}
}
然后在producer.properties文件中设置参数partitioner.class=blog.partitioner.HashPartitioner(全类名)
然后再run一下producer
我这里的分区数为4,所以0%4、4%4/、8%4都进入了分区0,也就是说余数一样的是在一个分区
3.3轮询分区
package blog.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author Daniel
* @Description Kafka 轮询分区
**/
public class RoundRobinPartitioner implements Partitioner {
//定义一个原子计数器
private AtomicInteger count = new AtomicInteger();
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer parCount = cluster.partitionCountForTopic(topic);
//轮流着分区
int partition = count.getAndIncrement() % parCount;
System.out.println("key: " + key + "\tpartition: " + partition);
return partition;
}
}
然后在producer.properties文件中设置参数partitioner.class=blog.partitioner.RoundRobinPartitioner(全类名)
然后再run一下producer
可以看到是按顺序进行分区的,0进0、1进1......