kafka生产者的实现
  sjr1u1Lqr9Qx 2023年11月02日 31 0
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

// 最简单的消息发送方式
object KafkaProducer1 {
  def main(args: Array[String]): Unit = {
    // brokers的地址,可以写多个。至少写2个避免单点故障
    val brokers = "node1:9092, node2:9092"
    val topic = "mykafka"
    // Properties类实现了Map接口,本质上是一种简单的Map容器
    val kafkaProps = new Properties()

    // 要往Kafka写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性
    // bootstrap.servers,指定broker的地址清单。不需要包含所有的broker地址,生产者会从给定的broker里查找其他broker的信息,建议至少提供2个broker的信息
    // key.serializer 将指定的值序列化。必选设置,即使key为空。 value.serializer 将value的值序列化
    kafkaProps.put("bootstrap.servers", brokers)
    kafkaProps.put("key.serializer",    "org.apache.kafka.common.serialization.StringSerializer")
    kafkaProps.put("value.serializer",  "org.apache.kafka.common.serialization.StringSerializer")

    // 使用字符串常量更好,避免输入错误
//    kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,      "node1:9092, node2:9092")
//    kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,   "org.apache.kafka.common.serialization.StringSerializer")
//    kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // KafkaProducer是一个泛型,必须指定key、value的类型
    val producer = new KafkaProducer[String, String](kafkaProps)

    // topic, key, value(可以不指定类型)
    val record = new ProducerRecord[String, String](topic, "", "my test 中文测试")

    producer.send(record)
    producer.close()
  }
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
sjr1u1Lqr9Qx