Spark Streaming中整合Spark SQL与HDFS
  ILwIY8Berufg 2023年11月02日 50 0

Maven依赖

<dependencies>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.11</artifactId>
		<version>2.2.2</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_2.11</artifactId>
		<version>2.2.2</version>
	</dependency>
</dependencies>

先测试一下环境,是否与Spark Streaming连接

WordCount.scala

package blog

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Author Daniel
  * @Description 测试SparkStreaming连接
  **/

object WordCount {
  def main(args: Array[String]): Unit = {
    if (args == null || args.length < 2) {
      println(
        """
          |Usage: <host> <port>
        """.stripMargin)
      System.exit(-1)
    }
    val Array(host, port) = args
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("WordCount")
    //batchduration表示每一次提交的含义是每隔多长时间产生一个批次batch,即提交一次sparkstreaming作业
    val batchInterval = Seconds(2)
    //编程入口
    val ssc = new StreamingContext(conf, batchInterval)

    //具体业务

    //为了容错,流式数据的特点,一旦丢失就找不回来了,所以要进行持久化
    val input: ReceiverInputDStream[String] = ssc.socketTextStream(host, port.toInt, StorageLevel.MEMORY_AND_DISK_SER_2)
    //wordcount
    val retDStream: DStream[(String, Int)] = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    //打印结果
    retDStream.print()
    //启动
    ssc.start()
    //保证streaming作业持续不断的运行
    ssc.awaitTermination()
  }
}

设置参数为hadoop01 9999

Spark Streaming中整合Spark SQL与HDFS_hadoop

通过nc来测试

首先安装

sudo yum -y install nc

打开端口9999

nc -lk hadoop01 9999

启动程序,并在传入信息

Spark Streaming中整合Spark SQL与HDFS_spark_02

可以看到结果成功被输出到控制台

Spark Streaming中整合Spark SQL与HDFS_spark_03

测试环境没问题了之后,进行与HDFS的整合

StreamingHDFS.scala

package blog

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Author Daniel
  * @Description Spark Streaming 整合HDFS
  **/

//sparkstreaming和hdfs整合 读取hdfs中新增的文件
object StreamingHDFS {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingHDFS")
      .setMaster("local")
    val batchInterval = Seconds(2)
    val ssc = new StreamingContext(conf, batchInterval)
    //        val input:DStream[String] = ssc.textFileStream("file:///F:/data/")//读取本地文件
    //读取hdfs中的文件,监控HDFS上文件的变化
    val input: DStream[String] = ssc.textFileStream("hdfs://bde/data/words")
    val ret = input.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    //遍历每个rdd
    ret.foreachRDD((rdd, time) => {
      //如果RDD不为空则输出
      if (!rdd.isEmpty()) {
        println(s"Time: $time")
        rdd.foreach(println)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

拷贝hdfs-site.xml与core-site.xml到当前目录!!

Spark Streaming中整合Spark SQL与HDFS_spark_04

准备一些数据文件,上传到hdfs

1.txt

hello
word
hello
ww
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop
hello
word
lily
hadoop
hadoop
spark
hive
spark
hive
hadoop

启动程序,上传文件至hdfs

hdfs dfs -put 1.txt /data/words/

Spark Streaming中整合Spark SQL与HDFS_hadoop_05

只要是流式的文件操作,Streaming都能监控到,所以可以自行写一个写文件操作

WriteFile.java

package blog

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

/**
  * @Author Daniel
  * @Description 流式写入文件到HDFS
  **/

object WriteFile {
  def main(args: Array[String]): Unit = {
    //设置用户名避免无权限
    System.setProperty("HADOOP_USER_NAME", "hadoop")
    val uri = new URI("hdfs://bde/")
    val fs = FileSystem.newInstance(uri, new Configuration())
    val fos = fs.create(new Path("/data/words/write.txt"))
    fos.write("hello spark\nhello streaming\nhello successfully".getBytes())
    fs.close()

  }
}

Spark Streaming中整合Spark SQL与HDFS_spark_06

接下来对Spark SQL进行整合

StreamingSQL.scala

package sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Author Daniel
  * @Description Spark Streaming 整合Spark SQL
  *              统计不同品牌下的不同产品的销售情况
  *              数据格式:
  *              001|mi|moblie
  *              005|huawei|moblie
  *
  **/
object StreamingSQL {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingSQL")
      .setMaster("local[*]")
    //提交频率
    val batchInterval = Seconds(2)
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, batchInterval)
    val lines = ssc.socketTextStream("hadoop01", 9999)
    //使用checkpoint进行管理
    ssc.checkpoint("file:///E:/data/checkpoint/ck2")
    //计算的是截止到目前为止的状态信息
    val usb = lines.map(line => {
      val key = line.substring(line.indexOf("|") + 1)
      (key, 1)
      //spark streaming算子操作
    }).updateStateByKey[Int]((seq: Seq[Int], option: Option[Int]) => Option(seq.sum + option.getOrElse(0)))
    //top3
    usb.foreachRDD((rdd) => {
      if (!rdd.isEmpty()) {
        import spark.implicits._
        //实例化RDD
        val rowRDD = rdd.map { case (brandCategory, count) => {
          val brand = brandCategory.substring(0, brandCategory.indexOf("|"))
          val category = brandCategory.substring(brandCategory.indexOf("|") + 1)
          MyRow(category, brand, count)
        }
        }
        val df = rowRDD.toDF("category", "brand", "count")
        df.createOrReplaceTempView("sale")
        val sql =
          """
            |select
            |  category,
            |  brand,
            |  count,
            |  row_number() over(partition by category order by count desc) as rank
            |from sale
            |having rank < 4
          """.stripMargin
        spark.sql(sql).show()
      }
    })


    ssc.start()
    ssc.awaitTermination()
  }
}

case class MyRow(category: String, brand: String, count: Int)

发送数据

001|mi|moblie
002|mi|moblie
003|mi|moblie
004|mi|moblie
005|huawei|moblie
006|huawei|moblie
007|huawei|moblie
008|Oppo|moblie
009|Oppo|moblie
010|uniqlo|colthing
011|uniqlo|colthing
012|uniqlo|colthing
013|uniqlo|colthing
014|uniqlo|colthing
015|selected|colthing
016|selected|colthing
017|selected|colthing
018|Armani|colthing
019|lining|sports
020|nike|sports
021|adidas|sports
022|nike|sports
023|anta|sports
024|lining|sports
025|lining|sports
nc -lk hadoop01 9999

Spark Streaming中整合Spark SQL与HDFS_apache_07

输出如下

Spark Streaming中整合Spark SQL与HDFS_hadoop_08

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  F36IaJwrKLcw   2023年12月23日   42   0   0 idesparkidesparkDataData
ILwIY8Berufg
最新推荐 更多

2024-05-31