目录
3. Spark 入门
3.1 Spark shell 的方式编写 WordCount
3.2 读取 HDFS 上的文件
3.3 编写独立应用提交 Spark 任务
3. Spark 入门
目标
- 通过理解 Spark 小案例, 来理解 Spark 应用
- 理解编写 Spark 程序的两种常见方式
- spark-shell 适合于数据集的探索和测试
- spark-submit 适合于上线
Spark 官方提供了两种方式编写代码, 都比较重要, 分别如下
spark-shell
Spark shell 是 Spark 提供的一个基于 Scala 语言的交互式解释器, 类似于 Scala 提供的交互式解释器, Spark shell 也可以直接在 Shell 中编写代码执行
这种方式也比较重要, 因为一般的数据分析任务可能需要探索着进行, 不是一蹴而就的, 使用 Spark shell 先进行探索, 当代码稳定以后, 使用独立应用的方式来提交任务, 这样是一个比较常见的流程spark-submit
Spark submit 是一个命令, 用于提交 Scala 编写的基于 Spark 框架, 这种提交方式常用作于在集群中运行任务
3.1 Spark shell 的方式编写 WordCount
概要
在初始阶段工作可以全部使用 Spark shell 完成, 它可以加快原型开发, 使得迭代更快, 很快就能看到想法的结果. 但是随着项目规模越来越大, 这种方式不利于代码维护, 所以可以编写独立应用. 一般情况下, 在探索阶段使用 Spark shell, 在最终使用独立应用的方式编写代码并使用 Maven 打包上线运行
接下来使用 Spark shell 的方式编写一个 WordCount
|
Spark shell 简介
|
|
Master地址的设置 Master 的地址可以有如下几种设置方式 Table 3. master |
地址 |
解释 |
|
使用 N 条 Worker 线程在本地运行 |
|
在 Spark standalone 中运行, 指定 Spark 集群的 Master 地址, 端口默认为 7077 |
|
在 Apache Mesos 中运行, 指定 Mesos 的地址 |
|
在 Yarn 中运行, Yarn 的地址由环境变量 |
Step 1 准备文件
在 Node01 中创建文件 /export/data/wordcount.txt
hadoop spark flume
spark hadoop
flume hadoop
Step 2 启动 Spark shell
cd /export/servers/spark
#指定master的运行地址 使用6个线程
bin/spark-shell --master local[6]
Step 3 执行如下代码
以下为精简版:
scala> val sourceRdd = sc.textFile("file:///export/data/wordcount.txt")
sourceRdd: org.apache.spark.rdd.RDD[String] = file:///export/data/wordcount.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_, 1))
flattenCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:26
#当前值+聚合局部结果
scala> val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)
aggCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:28
#注意:只有调用了collect才会求值 --惰性求值
scala> val result = aggCountRdd.collect
result: Array[(String, Int)] = Array((spark,2), (hadoop,3), (flume,2))
#以上代码可以整合为一行
sc.textFile("file:///export/data/wordcount.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect
|
sc 上述代码中 正常情况下我们需要自己创建, 但是如果使用 Spark shell 的话, Spark shell 会帮助我们创建, 并且以变量 |
运行流程
flatMap(_.split(" "))
将数据转为数组的形式, 并展平为多个数据map(_, 1
) 将数据转换为元组的形式reduceByKey(_ + _)
计算每个 Key 出现的次数
总结
- 使用 Spark shell 可以快速验证想法
- Spark 框架下的代码非常类似 Scala 的函数式调用
3.2 读取 HDFS 上的文件
目标
1. 理解 Spark 访问 HDFS 的两种方式
Step 1 上传文件到 HDFS 中
cd /export/data
hdfs dfs -mkdir /dataset
hdfs dfs -put wordcount.txt /dataset/
Step 2 在 Spark shell 中访问 HDFS
val sourceRdd = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")
val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_, 1))
val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)
val result = aggCountRdd.collect
|
如何使得 Spark 可以访问 HDFS? 可以通过指定 HDFS 的 NameNode 地址直接访问, 类似于上面代码中的 也可以通过向 Spark 配置 Hadoop 的路径, 来通过路径直接访问 1.在
2.在配置过后, 可以直接使用 3.在配置过后, 也可以直接使用路径访问 |
3.3 编写独立应用提交 Spark 任务
目标
- 理解如何编写 Spark 独立应用
- 理解 WordCount 的代码流程
Step 1 创建工程
- 创建 IDEA 工程
- → →
- → →
- 增加 Scala 支持
- 右键点击工程目录
- 选择增加框架支持
- 选择 Scala 添加框架支持
Step 2 编写 Maven 配置文件 pom.xml
- 工程根目录下增加文件
pom.xml
- 添加以下内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast</groupId>
<artifactId>spark</artifactId>
<version>0.1.0</version>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- 继承的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 因为在
pom.xml
中指定了 Scala 的代码目录, 所以创建目录src/main/scala
和目录src/test/scala
- 创建 Scala object
WordCount
Step 3 编写代码
object WordCounts {
def main(args: Array[String]): Unit = {
// 1. 创建 Spark Context
val conf = new SparkConf().setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
// 2. 读取文件并计算词频
val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 2)
val words: RDD[String] = source.flatMap { line => line.split(" ") }
val wordsTuple: RDD[(String, Int)] = words.map { word => (word, 1) }
val wordsCount: RDD[(String, Int)] = wordsTuple.reduceByKey { (x, y) => x + y }
// 3. 查看执行结果
println(wordsCount.collect)
}
}
|
和 Spark shell 中不同, 独立应用需要手动创建 SparkContext |
Step 4 运行
运行 Spark 独立应用大致有两种方式, 一种是直接在 IDEA 中调试, 另一种是可以在提交至 Spark 集群中运行, 而 Spark 又支持多种集群, 不同的集群有不同的运行方式
直接在 IDEA 中运行 Spark 程序
- 在工程根目录创建文件夹和文件
- 修改读取文件的路径为`dataset/wordcount.txt`
- 右键运行 Main 方法
|
spark-submit 命令
Table 4. options 可选参数 |
参数 |
解释 |
|
同 Spark shell 的 Master, 可以是spark, yarn, mesos, kubernetes等 URL |
|
Driver 运行位置, 可选 Client 和 Cluster, 分别对应运行在本地和集群(Worker)中 |
|
Jar 中的 Class, 程序入口 |
|
依赖 Jar 包的位置 |
|
Driver 程序运行所需要的内存, 默认 512M |
|
Executor 的内存大小, 默认 1G |
提交到 Spark Standalone 集群中运行
- 在 IDEA 中使用 Maven 打包
- 拷贝打包的 Jar 包上传到 node01 中
- 在 node01 中 Jar 包所在的目录执行如下命令
spark-submit --master spark://node01:7077 \
--class cn.itcast.spark.WordCounts \
original-spark-0.1.0.jar
|
如何在任意目录执行 spark-submit 命令?
|
总结: 三种不同的运行方式
Spark shell
- 作用
- 一般用作于探索阶段, 通过 Spark shell 快速的探索数据规律
- 当探索阶段结束后, 代码确定以后, 通过独立应用的形式上线运行
- 功能
- Spark shell 可以选择在集群模式下运行, 还是在线程模式下运行
- Spark shell 是一个交互式的运行环境, 已经内置好了 SparkContext 和 SparkSession 对象, 可以直接使用
- Spark shell 一般运行在集群中安装有 Spark client 的服务器中, 所以可以自有的访问 HDFS
本地运行
- 作用
- 在编写独立应用的时候, 每次都要提交到集群中还是不方便, 另外很多时候需要调试程序, 所以在 IDEA 中直接运行会比较方便, 无需打包上传了
- 功能
- 因为本地运行一般是在开发者的机器中运行, 而不是集群中, 所以很难直接使用 HDFS 等集群服务, 需要做一些本地配置, 用的比较少
- 需要手动创建 SparkContext
集群运行
- 作用
- 正式环境下比较多见, 独立应用编写好以后, 打包上传到集群中, 使用`spark-submit`来运行, 可以完整的使用集群资源
- 功能
- 同时在集群中通过`spark-submit`来运行程序也可以选择是用线程模式还是集群模式
- 集群中运行是全功能的, HDFS 的访问, Hive 的访问都比较方便
- 需要手动创建 SparkContext