Spark Core:Scala单词计数
  TEZNKK3IfmPf 2023年11月13日 17 0

1、实验描述

  • 利用Scala语言开发Spark WordCount程序
  • 实验时长:
  • 45分钟
  • 主要步骤:
  • 创建spark项目
  • 编写wordcount 示例程序
  • 运行Scala 程序
  • 查看实验结果

2、实验环境

  • 虚拟机数量:3(一主两从,主机名分别为:master、slave01、slave02)
  • 系统版本:Centos 7.5
  • Hadoop版本:Apache Hadoop 2.7.3
  • Spark版本:Apache Spark 2.1.1
  • JDK 版本:1.8.0_131
  • Scala版本: scala2.11.11
  • IDEA版本:ideaIC-2017.2.7

3、相关技能

  • Spark集成开发环境IDEA开发程序
  • Scala编写Spark程序

4、知识点

  • 常见linux命令的使用
  • Scala IDEA编程
  • Spark RDD
  • Scala原理

5、实验效果

spark RDD 单词计数结果查看操作最终效果如下图:

203_Spark Core:Scala单词计数

203_Spark Core:Scala单词计数

6、实验步骤

6.1打开虚拟机并启动Hadoop集群

6.1.1在master启动Hadoop集群

[zkpk@master ~]$ cd hadoop-2.7.3/
[zkpk@master hadoop-2.7.3]$ sbin/start-all.sh
[zkpk@master hadoop-2.7.3]$ hdfs dfsadmin -safemode  leave

203_Spark Core:Scala单词计数

203_Spark Core:Scala单词计数

6.1.2在master上运行jps,确认NameNode, SecondaryNameNode, ResourceManager进程启动

203_Spark Core:Scala单词计数

6.1.3在slave01上运行jps,确认DataNode, NodeManager进程启动

203_Spark Core:Scala单词计数

6.1.4在slave02上运行jps,确认DataNode, NodeManager进程启动

203_Spark Core:Scala单词计数

203_Spark Core:Scala单词计数

6.2打开IDEA,配置软件包依赖,创建工程

[zkpk@master ~]$ cd idea-IC-172.4574.19/
[zkpk@master idea-IC-172.4574.19]$ nohup bin/idea.sh &

6.2.1进入如下界面,点击 Create New Project

203_Spark Core:Scala单词计数

6.2.2进入如下图界面,按照图标依次点击,最后点击next

203_Spark Core:Scala单词计数

6.2.3依次输入GroupId和ArtifactId和Version的值,随后点击next

203_Spark Core:Scala单词计数

6.2.4进入如下界面,设置本地Maven项目的setting.xml文件和warehouse仓库,点击next按钮

6.2.4.1本地setting.xml文件在/home/zkpk/apache-maven-3.5.0/conf目录下

6.2.4.2本地仓库文件夹warehouse在/home/zkpk/apache-maven-3.5.0/warehouse

203_Spark Core:Scala单词计数

6.2.5进入如下界面,输入工程名称spark_test,然后点击next

203_Spark Core:Scala单词计数

203_Spark Core:Scala单词计数

6.2.6进入如下界面,即表示工程spark_test创建成功

203_Spark Core:Scala单词计数

6.2.7如5.1.6步骤所示,工程创建完成后会自动打开一个名为zkpk的xml文件,删除如下图标红部分的依赖

203_Spark Core:Scala单词计数

6.2.8在xml文件中找到properties配置项,修改scala版本号(此处对应scala安装版本),并添加spark版本号(此处对应spark安装版本)

203_Spark Core:Scala单词计数

6.2.9找到dependency配置项,添加如下图标红部分的配置,分别是scala依赖和spark依赖,${scala.version}表示上述配置的scala.version变量

203_Spark Core:Scala单词计数

6.2.10一般修改pom.xml文件后,会提示enable auto-import,点击即可,如果没有提示,则可以点击工程名,依次选择Maven—->Reimport,即可根据pom.xml文件导入依赖包

203_Spark Core:Scala单词计数

6.2.11设置语言环境language level,点击菜单栏中的file,选择Project Structure

203_Spark Core:Scala单词计数
6.2.12弹出如下对话框,选择Modules,选择Language level为8,然后点击Apply,点击OK

203_Spark Core:Scala单词计数

6.2.13设置Java Compiler环境,点击菜单栏中的file,选择Setting

203_Spark Core:Scala单词计数

6.2.14弹出如下对话框,依次选择Build,Execution—->Compiler—->Java Compiler,设置图中的Project bytecode version为1.8,设置图中的Target bytecode version为1.8,然后依次点击Apply和OK

203_Spark Core:Scala单词计数

6.2.15如下图删除测试环境test中的测试类

203_Spark Core:Scala单词计数
6.2.16如下图删除,main文件夹中,包名下的App文件

203_Spark Core:Scala单词计数

6.2.17至此,Spark Maven工程创建完毕

6.3编写Scala程序完成Spark单词计数

6.3.1如下图依次打开src—>main—>scala,在org.zkpk.lab上点击右键,创建Scala Class

203_Spark Core:Scala单词计数

6.3.2弹出如下对话框,输入类名ScalaWordCount,点击ok

203_Spark Core:Scala单词计数
6.3.3在类ScalaWordCount中新建伴生对象object ScalaWordCount

203_Spark Core:Scala单词计数

6.3.4在伴生对象object ScalaWordCount中创建main方法

203_Spark Core:Scala单词计数

6.3.5在main方法中创建列表List对象并赋值给常量list,列表中包含4个元素,分别是:“hello
hi hi spark”,“hello spark hello hi sparksql”,“hello hi hi sparkstreaming”,“hello hi sparkgraphx”

203_Spark Core:Scala单词计数
6.3.6创建SparkConf对象,对Spark运行属性进行配置,调用该对象的setAppName方法设置Spark程序的名称为“word-count”,调用setMaster方法设置Spark程序运行模式,一般分为两种:本地模式和yarn模式,这里我们采用本地模式,参数为“local[*]”,属性设置完成后赋值给常量sparkConf

6.3.7创建SparkContext,参数为sparkconf,赋值给常量sc,该对象是Spark程序的入口

203_Spark Core:Scala单词计数

6.3.8调用SparkContext对象sc的方法parallelize,参数为列表对象list,该方法使用现成的scala集合生成RDD
lines,类型为String(RDD为Spark计算中的最小数据单元),该RDD存储元素是字符串语句

203_Spark Core:Scala单词计数

6.3.9调用RDD对象lines的flatMap方法按照空格切分RDD中的字符串元素,并存入新的RDD对象words中,参数类型为String,该RDD存储的元素是每一个单词

203_Spark Core:Scala单词计数

6.3.10调用RDD对象words的map方法,将RDD中的每一个单词转换为kv对,key是String类型的单词,value是Int类型的1,并赋值给新的RDD对象wordAndOne,参数为(String,Int)类型键值对

203_Spark Core:Scala单词计数

6.3.11调用RDD对象wordAndOne的reduceByKey方法,传入的参数为两个Int类型变量,该方法将RDD中的元素按照Key进行分组,将同一组中的value值进行聚合操作,得到valueRet,最终返回(key,valueRet)键值对,并赋值给新的RDD对象wordAndNum,参数为(String,Int)类型键值对

203_Spark Core:Scala单词计数
6.3.12调用RDD对象wordAndNum的sortBy方法,第一个参数为kv对中的value,即单词出现次数,第二个参数为boolean类型,true表示升序,false表示降序

203_Spark Core:Scala单词计数

6.3.13调用ret对象的collect方法,获取集合中的元素,再调用mkString方法,参数为“,”,将集合中的元素用逗号连接成字符串,调用println方法打印输出在控制台

203_Spark Core:Scala单词计数

6.3.14调用ret对象的saveAsTextFile,该方法的参数为运行时指定的参数,此方法的用处是将Spark程序运行的结果保存到指定路径,一般是把结果保存到HDFS中,所以这里的参数定义为:

hdfs://master:9000/sparktest

6.3.14.1HDFS根目录中不能存在sparktest此目录,spark程序会自动创建该目录

6.3.15调用SparkContext对象sc的stop方法,释放spark程序所占用的资源

6.4运行程序,查看结果

6.4.1程序编辑完成后,点击菜单栏上的Run按钮,选择Run…

203_Spark Core:Scala单词计数

6.4.2弹出对话框,选择第一个Edit Configurations

203_Spark Core:Scala单词计数

6.4.3进入运行参数配置页面,点击+号按钮,选择Application,进入Application配置界面

203_Spark Core:Scala单词计数

203_Spark Core:Scala单词计数

6.4.4指定Application的名称Name,主函数Main Class,参数arguments和运行模式VM
options(由于在编写程序是已经指定了本地运行模式,所以这里的运行模式不指定),最后点击运行

203_Spark Core:Scala单词计数

6.4.5查看控制台输出

203_Spark Core:Scala单词计数

6.4.6查看HDFS输出

 203_Spark Core:Scala单词计数203_Spark Core:Scala单词计数 

7、参考答案

代码清单ScalaWordCount

203_Spark Core:Scala单词计数

package org.zkpk.lab
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}

class ScalaWordCount {

}
object ScalaWordCount{
  def main(args:Array[String]):Unit={
    val list =List("hello hi hi spark",
      "hello spark hello hi sparksql",
      "hello hi hi sparkstreaming",
      "hello hi sparkgraphx")
    val sparkConf=new SparkConf().setAppName("word-count").setMaster("local[*]")
    val sc=new SparkContext(sparkConf)
    val lines:RDD[String]=sc.parallelize(list)
    val words:RDD[String]=lines.flatMap((line:String) => {line.split(" ")})
    val wordAndOne:RDD[(String,Int)]=words.map((word:String) => {(word,1)})
    val wordAndNum:RDD[(String,Int)]=wordAndOne.reduceByKey((count1:Int,count2:Int) => {count1+count2})
    val ret=wordAndNum.sortBy(kv=>kv._2,false)
    println(ret.collect().mkString(","))
    ret.saveAsTextFile(args(0))
    sc.stop()
  }
}

8、总结

本实验是利用scala语言完成spark单词计数,快速掌握Spark程序Scala编程

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月15日   38   0   0 apachehadoopjava
  TEZNKK3IfmPf   2023年11月14日   17   0   0 Scala
  TEZNKK3IfmPf   2023年11月14日   13   0   0 Scala
  TEZNKK3IfmPf   2023年11月15日   17   0   0 apachehadoop
  TEZNKK3IfmPf   2023年11月14日   11   0   0 hadoop
  TEZNKK3IfmPf   2023年11月15日   10   0   0 Scalapython
  TEZNKK3IfmPf   24天前   16   0   0 Scala
  TEZNKK3IfmPf   2023年11月14日   12   0   0 Scala
  TEZNKK3IfmPf   24天前   25   0   0 hadoopHive
  TEZNKK3IfmPf   2023年11月14日   14   0   0 Scala
  TEZNKK3IfmPf   2023年11月14日   55   0   0 Scala类型
TEZNKK3IfmPf