Kafka + spark stream +redis (createStream + createDirectStream)
  TEZNKK3IfmPf 2023年11月12日 43 0

我们的应用场景是分析用户使用手机App的行为,描述如下所示:

1、手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列

2、后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming

3、经过Spark Streaming实时计算程序分析,将结果写入Redis

本例子是采用模拟的kafka生产json的数据,通过spark进行消费,然后将结果保存至redis中。其中spark的streaming实现有两种方式,即有receiver和没有receiver的两种。

在开始本例子之前需要确保以下几件事情:

1、redis是通过授权的,且密码是admin(可修改)。

//admin是密码 

 

  /usr/local/redis/bin/redis-cli -a admin

2、确保zookeeper是服务正常的且端口是默认的,或者修改本实例代码中的端口即可

3、确保kafka的服务是正常的且端口是默认的,或者修改本实例代码中的端口即可

运行环境:

centos7

jdk1.8

kafka: kafka_2.11-0.8.2.2

spark :spark-2.2.0-bin-hadoop2.7

scala :2.11.8

redis :redis-4.0.1

以下正文开始:

一、生产kafka数据

/**
 
* Created by Administrator on 2017/9/13.
 
* kafka生产者用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式
 
* 一个事件包含4个字段:
 
*   1、uid:用户编号
 
*   2、event_time:事件发生时间戳
 
*   3、os_type:手机App操作系统类型
 
*   4、click_count:点击次数
 
*/
 
object  
 KafkaEventProducer {
 
 
 
 
 
 private val  
 users  
 =  
 Array 
 (
 
 
 "user_01" 
 ,  
 "user_02" 
 ,  
 "user_03" 
 ,  
 "user_04" 
 ,  
 "user_05" 
 ,  
 "user_06" 
 , 
 "user_06" 
 ,  
 "user_08" 
 ,  
 "user_09" 
 , 
 "user_10" 
 )
 
 
 
 
 
 private val  
 random  
 =  
 new  
 Random()
 
 
 private var  
 pointer  
 = - 
 1
 
 
 def  
 getUserID():  
 String  
 = {
 
 
 pointer  
 =  
 pointer  
 +  
 1
 
 
 if  
 ( 
 pointer  
 >=  
 users 
 .length) {
 
 
 pointer  
 =  
 0
 
 
 users 
 ( 
 pointer 
 )
 
}  
 else  
 {
 
 
 users 
 ( 
 pointer 
 )
 
}
 
}
 
 
 def  
 click(): Double = {
 
 
 random 
 .nextInt( 
 10 
 )
 
}
 
 
 def  
 main(args: Array[ 
 String 
 ]): Unit = {
 
 
 val  
 topic =  
 "kafka_spark_redis_T"
 
//kafka集群
 
 
 val  
 brokers =  
 "hadoop2:9092,hadoop3:9092,hadoop4:9092"
 
 
 val  
 props =  
 new  
 Properties()
 
props.put( 
 "metadata.broker.list" 
 , brokers)
 
props.put( 
 "serializer.class" 
 ,  
 "kafka.serializer.StringEncoder" 
 )
 
//可以不要
 
props.put( 
 "group.id" 
 ,  
 "sparkTest" 
 )
 
 
 
 
 
 val  
 kafkaConfig =  
 new  
 ProducerConfig(props)
 
 
 val  
 producer =  
 new  
 Producer[ 
 String 
 ,  
 String 
 ](kafkaConfig)
 
 
 
 
 
 while  
 ( 
 true 
 ) {
 
 
 // prepare event data
 
 
 val  
 event =  
 new  
 JSONObject()
 
event
 
.put( 
 "uid" 
 ,  
 getUserID 
 )
 
.put( 
 "event_time" 
 , System. 
 currentTimeMillis 
 .toString)
 
.put( 
 "os_type" 
 ,  
 "ios" 
 )
 
.put( 
 "click_count" 
 ,  
 click 
 )
 
 
 // produce event message
 
 
 producer.send( 
 new  
 KeyedMessage[ 
 String 
 ,  
 String 
 ](topic, event.toString))
 
 
 println 
 ( 
 "Message sent: "  
 + event)
 
 
 //control produce rate
 
 
 Thread. 
 sleep 
 ( 
 200 
 )
 
}
 
}
 
}

二、建立redis连接池

object  
 RedisClient  
 extends  
 Serializable {
 
 
 val  
 redisHost  
 =  
 "hadoop4"//redis服务器
 
 
 val  
 redisPort  
 =  
 6379
 
 
 val  
 redisTimeout  
 =  
 30000
 
 
 val  
 MAX_ACTIVE 
 : Int =  
 1024
 
 
 val  
 MAX_IDLE 
 : Int =  
 200
 
 
 val  
 MAX_WAIT 
 : Int =  
 10000
 
 
 val  
 TEST_ON_BORROW 
 : Boolean =  
 true
 
val  
 AUTH  
 =  
 "admin"//授权密码
 
 
 
 
 
 val  
 config 
 : JedisPoolConfig =  
 new  
 JedisPoolConfig
 
 
 config 
 .setMaxTotal( 
 MAX_ACTIVE 
 )
 
 
 config 
 .setMaxIdle( 
 MAX_IDLE 
 )
 
 
 config 
 .setMaxWaitMillis( 
 MAX_WAIT 
 )
 
 
 config 
 .setTestOnBorrow( 
 TEST_ON_BORROW 
 )
 
 
 lazy val  
 pool  
 =  
 new  
 JedisPool( 
 config 
 ,  
 redisHost 
 ,  
 redisPort 
 ,  
 redisTimeout 
 ,  
 AUTH 
 )
 

 
 
 lazy val  
 hook  
 =  
 new  
 Thread {
 
 
 override def  
 run = {
 
 
 println 
 ( 
 "Execute hook thread: "  
 +  
 this 
 )
 
 
 pool 
 .destroy()
 
}
 
}
 
sys. 
 addShutdownHook 
 ( 
 hook 
 .run)
 
}

三、spark streaming消费数据,并存往redis中。

KafkaUtils. 
 createStream
 
object 
   
 Ka_spark_redis {
 
 
 
 
 
 def  
 main(args: Array[ 
 String 
 ]): Unit = {
 
 
 val  
 topics =  
 "kafka_spark_redis_T"//与produce中的topics相对应
 
 
 val  
 numThreads =  
 3
 
 
 
 
 
 val  
 zkQuorum =  
 "hadoop2:2181"//zookeeper地址,可以是集群
 
 
 val  
 group =  
 "spaekTest"//与produce中的group相对应
 
 
 val  
 sparkConf =  
 new  
 SparkConf().setAppName( 
 "Ka_spark_redis_T" 
 ) .setMaster( 
 "local[2]" 
 )
 
Logger. 
 getLogger 
 ( 
 "spark" 
 ).setLevel(Level. 
 WARN 
 )
 
 
 val  
 ssc =  
 new  
 StreamingContext(sparkConf,  
 Seconds 
 ( 
 5 
 ))
 
 
 val  
 clickHashKey =  
 "app_users_click"//redis中Hash的名字,存储的格式<k,v>
 
 
 
 
 
 val  
 topicMap = topics.split( 
 "," 
 ).map((_, numThreads.toInt)).toMap
 
 
 val  
 data = KafkaUtils. 
 createStream 
 (ssc, zkQuorum, group, topicMap, StorageLevel. 
 MEMORY_AND_DISK_SER 
 )
 
 
 //{"uid":"user_02",
 
// "event_time":"1505270531256",
 
// "os_type":"Android",
 
// "click_count":4}
 
 
 val  
 events = data.flatMap(line => {
 
 
 val  
 data = JSONObject. 
 fromObject 
 (line._2)
 
 
 Some 
 (data)
 
})
 
 
 // Compute user click times
 
 
 val  
 userClicks = events.map(x => (x.getString( 
 "uid" 
 ), x.getInt( 
 "click_count" 
 ))).reduceByKey(_ + _)
 
userClicks.foreachRDD(rdd => {
 
rdd.foreachPartition(partitionOfRecords => {
 
partitionOfRecords.foreach(pair => {
 
 
 val  
 uid = pair._1
 
 
 val  
 clickCount = pair._2
 
 
 val  
 jedis = RedisClient. 
 pool 
 .getResource
 
jedis.hincrBy(clickHashKey, uid, clickCount)
 
RedisClient. 
 pool 
 .returnResource(jedis)
 
})
 
})
 
})
 
ssc.start()
 
ssc.awaitTermination()
 
}
 
}

KafkaUtils. createDirectStream

object  
 UserClickCountAnalytics {
 
 
 def  
 main(args: Array[ 
 String 
 ]): Unit = {
 
 
 var  
 masterUrl =  
 "local[1]"
 
 
 // Create a StreamingContext with the given master URL
 
 
 val  
 conf =  
 new  
 SparkConf().setMaster(masterUrl).setAppName( 
 "UserClickCountStat" 
 )
 
 
 val  
 ssc =  
 new  
 StreamingContext(conf,  
 Seconds 
 ( 
 5 
 ))
 
 
 // Kafka configurations
 
 
 val  
 topics =  
 Set 
 ( 
 "kafka_spark_redis_T" 
 )
 
 
 val  
 brokers =  
 "hadoop2:9092,hadoop3:9092,hadoop4:9092"
 
 
 val  
 groupId =  
 "sparkTest"
 
 
 val  
 kafkaParams =  
 Map 
 [ 
 String 
 ,  
 String 
 ](
 
 
 "metadata.broker.list"  
 -> brokers,
 
 
 "serializer.class"  
 ->  
 "kafka.serializer.StringEncoder"
 
 
 )
 
 
 val  
 clickHashKey =  
 "app_users_click"
 
 
 // Create a direct stream
 
 
 val  
 kafkaStream = KafkaUtils. 
 createDirectStream 
 [ 
 String 
 ,  
 String 
 , StringDecoder, StringDecoder](ssc, kafkaParams, topics)
 
 
 val  
 events = kafkaStream.flatMap(line => {
 
 
 val  
 data = JSONObject. 
 fromObject 
 (line._2)
 
 
 Some 
 (data)
 
})
 
 
 
 
 
 // Compute user click times
 
 
 val  
 userClicks = events.map(x => (x.getString( 
 "uid" 
 ), x.getInt( 
 "click_count" 
 ))).reduceByKey(_ + _)
 
userClicks.foreachRDD(rdd => {
 
rdd.foreachPartition(partitionOfRecords => {
 
partitionOfRecords.foreach(pair => {
 
 
 val  
 uid = pair._1
 
 
 val  
 clickCount = pair._2
 
 
 val  
 jedis = RedisClient. 
 pool 
 .getResource
 
jedis.hincrBy(clickHashKey, uid, clickCount)
 
RedisClient. 
 pool 
 .returnResource(jedis)
 
})
 
})
 
})
 
ssc.start()
 
ssc.awaitTermination()
 
}
 
}

四、运行环境依赖

本实例依赖spark和scala,版本已经在上面有列出,以下是部分依赖。

< 
 dependency 
 >
 
 
 < 
 groupId 
 > 
 org.apache.spark 
 </ 
 groupId 
 >
 
 
 < 
 artifactId 
 > 
 spark-streaming-kafka-0-8_2.11 
 </ 
 artifactId 
 >
 
 
 < 
 version 
 > 
 2.2.0 
 </ 
 version 
 >
 
</ 
 dependency 
 >
 
 
 
 
< 
 dependency 
 >
 
 
 < 
 groupId 
 > 
 org.apache.spark 
 </ 
 groupId 
 >
 
 
 < 
 artifactId 
 > 
 spark-streaming-flume_2.11 
 </ 
 artifactId 
 >
 
 
 < 
 version 
 > 
 2.2.0 
 </ 
 version 
 >
 
</ 
 dependency 
 >
 
 
 
 
< 
 dependency 
 >
 
 
 < 
 groupId 
 > 
 org.codehaus.jettison 
 </ 
 groupId 
 >
 
 
 < 
 artifactId 
 > 
 jettison 
 </ 
 artifactId 
 >
 
 
 < 
 version 
 > 
 1.3.8 
 </ 
 version 
 >
 
</ 
 dependency 
 >
 
 
 
 
< 
 dependency 
 >
 
 
 < 
 groupId 
 > 
 mysql 
 </ 
 groupId 
 >
 
 
 < 
 artifactId 
 > 
 mysql-connector-java 
 </ 
 artifactId 
 >
 
 
 < 
 version 
 > 
 5.1.43 
 </ 
 version 
 >
 
</ 
 dependency 
 >
 
< 
 dependency 
 >
 
 
 < 
 groupId 
 > 
 net.sf.json-lib 
 </ 
 groupId 
 >
 
 
 < 
 artifactId 
 > 
 json-lib 
 </ 
 artifactId 
 >
 
 
 < 
 version 
 > 
 2.3 
 </ 
 version 
 >
 
</ 
 dependency 
 >
 
< 
 dependency 
 >
 
 
 < 
 groupId 
 > 
 redis.clients 
 </ 
 groupId 
 >
 
 
 < 
 artifactId 
 > 
 jedis 
 </ 
 artifactId 
 >
 
 
 < 
 version 
 > 
 2.9.0 
 </ 
 version 
 >
 
</ 
 dependency 
 >
 
 
 
 
< 
 dependency 
 >
 
 
 < 
 groupId 
 > 
 org.apache.commons 
 </ 
 groupId 
 >
 
 
 < 
 artifactId 
 > 
 commons-pool2 
 </ 
 artifactId 
 >
 
 
 < 
 version 
 > 
 2.2 
 </ 
 version 
 >
 
</ 
 dependency 
 >
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年05月31日   23   0   0 redis用户
  TEZNKK3IfmPf   2024年04月26日   48   0   0 面向对象Scala
  TEZNKK3IfmPf   2024年04月26日   31   0   0 htmlScala
  TEZNKK3IfmPf   2024年05月31日   27   0   0 dataredis
  TEZNKK3IfmPf   2024年04月26日   36   0   0 Scalajavascript
  TEZNKK3IfmPf   2024年05月31日   23   0   0 awkredis
  TEZNKK3IfmPf   2024年04月26日   45   0   0 Scala
  TEZNKK3IfmPf   2024年04月19日   35   0   0 javarediskey
TEZNKK3IfmPf