import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
// 最简单的消息发送方式
object KafkaProducer1 {
def main(args: Array[String]): Unit = {
// brokers的地址,可以写多个。至少写2个避免单点故障
val brokers = "node1:9092, node2:9092"
val topic = "mykafka"
// Properties类实现了Map接口,本质上是一种简单的Map容器
val kafkaProps = new Properties()
// 要往Kafka写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性
// bootstrap.servers,指定broker的地址清单。不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息,建议至少提供2个broker的信息
// key.serializer 将指定的值序列化。必选设置,即使key为空。 value.serializer 将value的值序列化
kafkaProps.put("bootstrap.servers", brokers)
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// 使用字符串常量更好,避免输入错误
// kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092, node2:9092")
// kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// KafkaProducer是一个泛型,必须指定key、value的类型
val producer = new KafkaProducer[String, String](kafkaProps)
// topic, key, value(可以不指定类型)
val record = new ProducerRecord[String, String](topic, "", "my test 中文测试")
producer.send(record)
producer.close()
}
}