SparkCore介绍
  ILwIY8Berufg 2023年11月02日 61 0

1. 什么是Spark

1.1. 大数据生态发展

SparkCore介绍_scala

1.2 什么是Spark

spark官网地址:http://spark.apache.org

Spark是一个通用的可扩展的处理海量数据集的计算引擎。

1.2.1. Spark特点


  • 相比给予MR,官方说,基于内存计算spark要快mr100倍,基于磁盘计算spark要快mr10倍。

SparkCore介绍_SPARK_02

快的原因:

1. 基于内存计算
2. 计算和数据的分离
3. 基于DAGScheduler的计算划分
4. 只有一次的Shuffle输出操作
  • 易用
    Spark提供超过80多个高阶算子函数,来支持对数据集的各种各样的计算,使用的时候,可以使用java、scala、python、R,非常灵活易用。
df = spark.read.json("logs.json") 
df.where("age > 21")
	.select("name.first")
    .show()
  • 通用

SparkCore介绍_spark_03

  • 到处运行

SparkCore介绍_SPARK_04

1.2.2. 人话

什么是Spark呢?它就是一个集成离线计算,实时计算,SQL查询,机器学习,图计算为一体的通用的计算框架。

何为通用?就是在一个项目中,既可以使用离线计算,也可以使用其他比如,SQL查询,机器学习,图计算等等,而这时Spark最最最强大的优势,没有之一。

而这一切的基础是SparkCore,速度比传统的mr快的原因就是基于内存的计算。

Spark开发过程中,使用到的模型——RDD(Resilient Distributed Dataset, 弹性式分布式数据集),在编程中起到了非常重要的作用。

何为RDD,其实RDD就是一个不可变的scala集合。从3个方面理解:

  1. 弹性:如果内存充足,那集合数据的存储和计算,就都在内存中完成;如果内存不足,需要有一部分数据溢出到磁盘,然后在磁盘完成存储和计算。
  2. 分布式:就和之前学习的分布式概念一样,一个集合的数据被拆分成多个部分,这每一个部分被称之为一个分区partition,还是一个scala的不可变的集合。默认情况下,partition是和hdfs中data-block块对应的,spark加载hdfs文件时,一个data-block块对应一个partition。
  3. 数据集:存放数据的集合
    而Spark就是对这个RDD及其集合功能算子的实现。
    RDD,弹性式分布式数据集,是Spark的第一代编程模型,spark预计将在3.0中让rdd光荣退休,转而使用Dataset来完成相应的功能。

2. Spark分布式环境的安装

使用的Spark的版本是2.2.2,最新的版本应该2.4.4。

下载地址:https://archive.apache.org/dist/spark/spark-2.2.2/

提供的安装包:

spark-2.2.2.tgz ---->源码包

spark-2.2.2-bin-hadoop2.7.tgz ---->安装包

2.1. 基于windows的Spark体验

  • 解压

SparkCore介绍_spark_05

  • 启动%SPARK_HOME%\bin目录spark-shell.cmd/spark-shell2.cmd脚本

SparkCore介绍_scala_06

scala> sc.textFile("E:/data/hello.txt")
.flatMap(_.split("\\s+"))
.map((_, 1))
.reduceByKey(_+_)
.foreach(println)

看到结果输出

SparkCore介绍_SPARK_07

同时观察到了web-ui的变化

SparkCore介绍_scala_08

能够得出的基本结论是什么?

  1. Spark的application,可以有非常多的job作业,和mr不同,一个应用就提交一个job就行。
  2. job的执行,好像得需要某些操作触发,否则不会执行,触发的操作就是spark作业执行的动因。
  3. spark job作业的执行是分stage阶段的

SparkCore介绍_SPARK_09

     d. spark job作业的执行stage阶段形成了一个stage的DAG有向无环图

SparkCore介绍_scala_10

2.2. Spark分布式环境安装

2.2.1. 安装scala

  1. 解压
[bigdata@bigdata01 ~]$ tar -zxvf soft/scala-2.11.8.tgz -C app/
  1. 重命名
[bigdata@bigdata01 ~]$ mv app/scala-2.11.8/ app/scala
  1. 添加环境变量
    在当前用户的环境变量配置文件中添加环境变量
[bigdata@bigdata01 ~]$ vim ~/.bash_profile

添加如下内容

export SCALA_HOME=/home/bigdata/app/scala
export PATH=$PATH:$SCALA_HOME/bin
  1. 配置文件生效
[bigdata@bigdata01 ~]$ source ~/.bash_profile
  1. 验证

2.2.2. 安装Spark

  1. 解压
[bigdata@bigdata01 ~]$ tar -zxvf soft/spark-2.2.2-bin-hadoop2.7.tgz -C app/
  1. 重命名
[bigdata@bigdata01 ~]$ mv app/spark-2.2.2-bin-hadoop2.7/ app/spark
  1. 添加环境变量
    在当前用户的环境变量配置文件中添加环境变量
[bigdata@bigdata01 ~]$ vim ~/.bash_profile

添加如下内容

export SPARK_HOME=/home/bigdata/app/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
  1. 环境变量生效
[bigdata@bigdata01 ~]$ source ~/.bash_profile
  1. 修改配置文件
    spark的配置文件,在$SPARK_HOME/conf目录下
    1)、拷贝slaves和spark-env.sh文件
[bigdata@bigdata01 conf]$ cp slaves.template slaves
[bigdata@bigdata01 conf]$ cp spark-env.sh.template spark-env.sh

2)、修改slaves配置

配置spark的从节点的主机名,spark中的从节点叫做worker,主节点叫做Master

[bigdata@bigdata01 conf]$ vim slaves
bigdata02
bigdata03

3)、修改spark-env.sh文件

添加如下内容:

export JAVA_HOME=/opt/jdk
export SCALA_HOME=/home/bigdata/app/scala
export SPARK_MASTER_IP=bigdata01
export SPARK_MASTER_PORT=7077 ##rpc通信端口,类似hdfs的9000端口,不是50070
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/bigdata/app/hadoop/etc/hadoop

4)、同步scala和spark到其它节点中

scp -r /home/bigdata/app/scala bigdata@bigdata02:/home/bigdata/app
scp -r /home/bigdata/app/scala bigdata@bigdata03:/home/bigdata/app

scp -r /home/bigdata/app/spark bigdata@bigdata02:/home/bigdata/app
scp -r /home/bigdata/app/spark bigdata@bigdata03:/home/bigdata/app
在bigdata02和bigdata03上加载好环境变量,需要source生效
scp ~/.bash_profile bigdata@bigdata02:/home/bigdata
scp ~/.bash_profile bigdata@bigdata03:/home/bigdata

2.2.2. 启动并体验

  1. 启动
    使用$SPARK_HOME/sbin目录下的脚本start-all.sh,和hadoop中的启动脚本重名,所以建议修改之。
[bigdata@bigdata01 spark]$ mv sbin/start-all.sh sbin/start-spark-all.sh

启动命令便成为了:

start-spark-all.sh

SparkCore介绍_SPARK_11

  1. 停止
    使用$SPARK_HOME/sbin目录下的脚本stop-all.sh
[bigdata@bigdata01 spark]$ mv sbin/stop-all.sh sbin/stop-spark-all.sh

停止命令便成为了:

stop-spark-all.sh

  1. 验证
    启动起来之后,spark的主节点master会类似resourcemanager提供一个web的ui,访问地址为:
    http://master-ip:8080

SparkCore介绍_spark_12

2.3. Spark HA的环境安装

因为在目前情况下,集群中只有一个Master,如果master挂掉,便无法对外提供新的服务,显然有单点故障问题,解决方法就是master的ha。

有两种方式解决单点故障,一种基于文件系统FileSystem(生产中不用),还有一种基于Zookeeper(使用)。

配置基于Zookeeper的一个ha是非常简单的,只需要在spark-env.sh中添加一句话即可。

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata01:2181,bigdata02:2181,bigdata03:2181 -Dspark.deploy.zookeeper.dir=/spark"

spark.deploy.recoveryMode设置成 ZOOKEEPER spark.deploy.zookeeper.urlZooKeeper URL spark.deploy.zookeeper.dir ZooKeeper 保存恢复状态的目录,缺省为 /spark

因为ha不确定master在bigdata01上面启动,所以将

export SPARK_MASTER_IP=bigdata01和export SPARK_MASTER_PORT=7077注释掉

最后别忘了,同步spark-env.sh到其它机器。

同步完毕之后,重启spark集群!

bigdata01的master状态

SparkCore介绍_scala_13

bigdata02也启动master

start-master.sh,其状态为:

SparkCore介绍_SPARK_14

ha验证,要干掉alive的master,观察standby的master

bigdata02的状态缓慢的有standby转变为alive

SparkCore介绍_SPARK_15

2.4. 动态增删一个worker节点到集群

  • 上线一个节点
    不需要在现有集群的配置上做任何修改,只需要准备一台worker机器即可,可和之前的worker的配置相同。

SparkCore介绍_spark_16


[bigdata@bigdata01 spark]$ sbin/start-slave.sh  -c 4 -m 1000M -p 11111  spark://bigdata02:7077
  • 下线一个节点
    kill或者stop-slave.sh都可以

3. Spark核心概念

3.1. 名词解释

  1. ClusterManager:在Standalone(上述安装的模式,也就是依托于spark集群本身)模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器ResourceManager。
  2. Worker:从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的控制,启动的进程叫Container。
  3. Driver:运行Application的main()函数并创建SparkContext(是spark中最重要的一个概念,是spark编程的入口,作用相当于mr中的Job)。
  4. Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。
  5. SparkContext:整个应用的上下文,控制应用的生命周期,是spark编程的入口。
  6. RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。
    RDD是弹性式分布式数据集,理解从3个方面去说:弹性、数据集、分布式。
    是Spark的第一代的编程模型。
  7. DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。
    DAGScheduler就是Spark的大脑,中枢神经。
  8. TaskScheduler:将任务(Task)分发给Executor执行。
  9. Stage:一个Spark作业一般包含一到多个Stage。
  10. Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。
    task的个数由rdd的partition分区决定,spark是一个分布式计算程序,所以一个大的计算任务,就会被拆分成多个小的部分,同时进行计算。
  11. Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
  12. Actions:操作/行动(Actions)算子 (如:count, collect, foreach等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
  13. SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。SparkEnv内创建并包含如下一些重要组件的引用。
  14. MapOutPutTracker:负责Shuffle元信息的存储。
  15. BroadcastManager:负责广播变量的控制与元信息的存储。
  16. BlockManager:负责存储管理、创建和查找块。
  17. MetricsSystem:监控运行时性能指标信息。
  18. SparkConf:负责存储配置信息。作用相当于hadoop中的Configuration。

3.2. Spark组件官网说明

SparkCore介绍_SPARK_17

Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).

Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

ILwIY8Berufg
最新推荐 更多

2024-05-31