Spark的概念和搭建(含代码使用)
  SkyPJX5vy5h5 2023年11月02日 42 0

一,概念

1,Spark虽然也是阿帕奇的,但是它开发时仅用于分布式数据计算,和hadoop并无关系,spark有自己的资源调度(master和worker)和任务管理模块(driver和executor,dirver和MR中的AM一样是进程中的老大,而exe是普通员工,负责一个进程的)。这些模块的功能和hadoop中的yarn是差不多的。Hadoop中有分布式计算框架MapRudece,也是用来分布式计算的,但是MR的计算能力完全不如spark。

2,spark是spark,Hadoop是Hadoop,二者没联系。前者擅长分布式计算,后者擅长分布式存储,本来企业中应该同时搭建它们2个的,但是为了节约存储,弄出来一种spark on yarn的模式,这使得spark可以运行作为一款应用运行在hadoop上了,而且spark还不用像之前那样繁琐的分布式安装,只需要作为应用安装在hadoop集群中的一台机子上(像hive那样)。

3,Spark有哪些模块?


Spark的概念和搭建(含代码使用)_spark

核心SparkCore、SQL计算(SparkSQL)、流计算(SparkStreaming)、图计算(GraphX)、机器学习(MLlib)其中Spark cores是所有模块的基础

二,spark的三种搭建模式


Spark的概念和搭建(含代码使用)_数据_02

1,三种模式

2,Local模式多用于测试,它只需要在一台机器上就可以模拟完成分布式。且它运行时一个任务只创建一个进程,它用线程模拟整个spark运行。


Spark的概念和搭建(含代码使用)_spark_03

Spark的概念和搭建(含代码使用)_python_04

Spark的概念和搭建(含代码使用)_spark_05

3,StandAlone模式,是在多台虚拟机上部署spark。这种方式中spark和hadoop是没有任何关系的。每一台机子都要安装spark包,也要像搭建hadoop一样进行主从角色规划。Spark共有三种角色

4,spark on yarn,只要在hadoop集群中的一台机器上安装spark就可。这种模式下spark负责资源调度的master和worker不工作,而是把资源调度交给了yarn。Spark中只有负责任务管理的driver和executor工作。

5,Spark on yarn有两种搭建模式,终端模式(又叫客户端模式)和集群模式。他们的区别只有driver运行的位置不同,executor都一样,只有“进程的老大”driver运行位置不一样。

终端模式中driver运行在终端上(就是命令窗口)。在命令窗口中输入命令运行pyspark程序,若是终端模式则driver会在终端的进程里面,而是集群模式的话,driver和exe在一起(就像上图那样)

终端模式中。Driver在终端里面,exe却在集群中,这使得driver和exe之间需要频繁通信,会降低效率,但好处是程序执行结果可以直接输出到终端。集群模式的结果只能去日志里面翻。


Spark的概念和搭建(含代码使用)_数据_06

(开发测试时用终端模式,正式发行时用集群模式)

6,在saprk安装包下运行bin/pyspark --master yarn --deploy-mode cluster即可以on yarn方式开启spark(开启之前要保证Hadoop正常工作)。运行成功的话,会出现斜杠拼出的spark界面,且提示有一个webUI网址。

# --deploy-mode 选项是指定部署模式, 默认是客户端模式

# client就是客户端模式

# cluster就是集群模式

三,pyspark应用程序

1,它位于saprk安装包的sbin目录下,直接写它的路径名+pyspark就可以运行。Spark是分布式计算框架,spark支持多种语言包括Java和python。Pyspark程序运行起来后,我们可以交互式的写python代码,然后程序会把python解析好让spark来处理。

2,想要运行pyspark需要安装好python环境。这里建议直接在虚拟机中安装anaconda。(spark on yarn的话应该只需要在装spark的那台机器上安装好python环境)

3,Anaconda是一个python软件包,里面包含了python本身和许多其他软件包(有点像java中的各种依赖)。如果电脑中安装了许多个python,则ana可以构造为项目虚拟环境,指定该系统使用哪个python版本。默认的虚假环境是base,我们也可以创建虚拟环境,还可以通过active 虚拟环境名来指定使用哪个环境。

刚安装好ana如果不显示默认虚拟环境base则执行

source /1filerj/export/server/anaconda3/bin/activate

4,pyspark只能交互式运行python语句,要想一次性执行一个.py文件的话,需要在spark安装目录的sbin下运行spark-submit。

5,有一个类库的名字也叫pyspark,但它和pyspark应用程序完全不一样,要区分开。Pyspark类库它可以用import导入python代码中,是我们编写spark代码的一个非常重要的“头文件”,就和Java中的jar包依赖一样。

Linux中除了搭建python环境外,还需要用pip下载好pyspark类库。由于虚拟机没有新建python虚拟环境,所以只需要在默认的base环境中下载这个包就行。

四,pycharm远程连接到linux集群上

1,新建了一个工程后,需要点菜单栏的tool->development->configuration->mappings去配置本地的python工程目录和linux上的工程目录的对应关系。因为在运行程序的时候,pycharm会先把本工程的所有文件上传到linux上去。


Spark的概念和搭建(含代码使用)_python_07

2,远程运行普通的python代码只要求linux安装好了python环境。但是运行pyspark需要点击下图,找到里面的环境变量,添加虚拟机中JAVA_HOME的路径。在pycharm中编写spark处理代码还需要linux中已经安装好了pyspark包。


Spark的概念和搭建(含代码使用)_python_08


五,RDD

1,RDD(Resilient Distributed Dataset)弹性分布式数据集。RDD是一个数据集【可理解为专门用来存储spark计算时需要数据的一种数据结构】,就像数组,集合,字典那样。Spark在进行分布式计算时,首先需要从一个文件中读取数据(文件一般位于hdfs中),spark会创建一个RDD去存放被读取的数据。RDD它是分区的,它里面有许多子区,数据会分别存放在不同的区,在运算时,各个区可以并行运算,每个分区的数据都executor管理。Exe其实就是一个进程,一个进程可以管理一个或多个分区。

RDD中的“弹性”是指,RDD中的数据可以存储在内存中或者磁盘中;RDD的“分布式“是指数据是分布式存储的,可用于分布式计算。

2,RDD特性

(1)对RDD使用一个方法,该方法会作用在它的每个分区上。所以对RDD使用的方法叫做算子。

①算子分为两类,转换算子transformation和行动算子action转换算子。

②转换算子是lazy load懒加载的,遇到转换算子不会执行,只有遇到action算子了,才会执行该算子前面的转换算子,然后再执行action算子。转换算子相当于在构建计划,而action才能启动计划

③转换算子的返回值一定也是RDD

(2)RDD之间是有血缘关系(依赖关系)的,RDD是有旧的RDD更新迭代出来的,新RDD产生后内存会释放旧RDD。

3,RDD的数据是过程数据,当新的RDD产生后,老的RDD就会被回收。RDD被转换算子算换的过程就像人类社会老人去世,婴儿出生一样。所以说RDD之间有血缘关系。不仅转换算子会使得老RDD消失,在执行action算子的时候,它所依赖的RDD也会消失

下图中,执行第一行的collect的时候,前面这些转换算子才会开始执行,执行到collect的时候只剩下rdd4存在了。而第二行的collet需要用到rdd3,此时rdd3已经销毁,所以还必须从rdd1开始迭代。即绿色框内的会被执行两次。



Spark的概念和搭建(含代码使用)_python_09

4,我们可以用API把还要用到的rdd缓存起来,这样用它的时候就不用从头迭代。下面的方法可以把rdd存到内存中,当内存不够用就存到磁盘。使用它需要从pyspark导入新方法StorageLevel。

rdd.persist(StorageLevel.MEMORY_AND_DISK)

清理掉缓存的rdd

rdd.unpersist()

备注1:rdd缓存是分散存储的,就是每个分区都分别把数据存储到自己的分区附近,数据不是先汇总再存储的。

备注2:如果是小的rdd,可以直接rdd.cache()。就是把这个rdd存入缓存

5,磁盘存储rdd。CheckPoint。


Spark的概念和搭建(含代码使用)_python_10

上面那种缓存rdd是不安全的,存储的rdd遇到意外会找不到。但是缓存保留了rdd之间的血缘关系,找不到了,只需要从头生成就可。而CheckPoint是把数据汇总之后再存到磁盘,是不易失的,但是不会保留rdd血缘关系。

6,RDD的迭代过程与pycharm代码的对应(WordCount代码为例)

①pycharm中使用textFile读入HDFS中的一个文件,此时RDD被自动创建,其中还有多个分区,每个分区存一部分数据。

②使用了flatMap,通过空格来区分每个单词,把区分开的单词放到集合里。此时RDD1迭代成了RDD2。

③使用了map语句,把单个的单词转换成了(单词,1)的键值对的形式。RDD2迭代成了RDD3

④使用了分组映射再加上+操作,把key相同的分到一组,再把他们的value加起来。RDD3迭代成RDD4。

⑤执行了collect命令,各个分区的ececutor把自己的结果汇总到了Driver上。由Driver输出最后的结果。



Spark的概念和搭建(含代码使用)_spark_11

Spark的概念和搭建(含代码使用)_python_12



六,spark RDD编程

(一)构建出RDD

1,基本步骤。导入头文件,创建RDD编程的入口对象sc。

Spark的概念和搭建(含代码使用)_spark_13





2,获取外部数据,并把数据作为RDD存储(RDD是分布式的)。有两种获取方式

Spark的概念和搭建(含代码使用)_spark_14

(1)把本程序的数据转(集合、数组等)为分布式的RDD。【并行化创建】






(2)读取外部文件,可以是本地文件,也可以是HDFS上的文件。有两个方法可以读取,它们的区别仅仅是,后者一般用来读取“小文件“。前者读取大文件。使用whole的时候返回的rdd结果会把文件路径一并返回,而不是仅返回文件内容,所以我们一般使用textFile


Spark的概念和搭建(含代码使用)_python_15

①rdd1=sc.textFile(“文件路径“,”最小分区数“)


Spark的概念和搭建(含代码使用)_python_16

②rdd1=sc.wholeTextFiles(“文件路径“,”最小分区数“)

(二)常用的RDD工具

1,获取这个RDD的分区数。(放进print中直接打印出来)

rdd1.getNumPartitions()

2,对象.split(“xx”)以什么为分隔符切割数据。

Split一般只与flatMap一起使用,因为它是映射+展平。在wordsCount中,如果使用map+split后面转成KV时会出错,变成整个文件后面才加上一个value,而不是每个单词后面都加上一个value。

(三)转换算子

1,备注:转换算子的传入参数一般是一个函数。算子接收到“实参函数“后,会把RDD中的数据作为函数形参,让RDD数据去执行函数体中的语句。


Spark的概念和搭建(含代码使用)_python_17

2,Map算子。会使得RDD中的数据全部执行Map参数(是一个函数)的函数体中的操作

改进:①函数可以使用lamda表达式,直接在算子的参数中定义②不必使用rdd1,rdd2这样。直接rdd=rdd.xxx,这样写其实两个rdd也不是一样的了。

2,FlatMap它和Map的区别就是,它会把collect结果集中间的[ ]去掉。


Spark的概念和搭建(含代码使用)_数据_18

Spark的概念和搭建(含代码使用)_python_19

注意,我们不可以把同一个list先用map转换一次,再用faltMap转换,这样第二次转换时会提示找不到list。因为第一次转换时list就已经转到RDD中去了。

3,ruduceBykey,它是专门用于键值对型的RDD数据的,会先按照key分组,再对key相同的同组元素的key执行操作,操作有实参函数指定。相当于数据库的Group By+count等聚合函数。

Rudece是减少,也可以理解为聚合。


Spark的概念和搭建(含代码使用)_数据_20

它的执行步骤是,不是直接从1加到5,而是一直在

Spark的概念和搭建(含代码使用)_数据_21

a+b,把原来的和作为a,后一个数作为b这样相加。但是结果和直接加起来是一样的。

4,GroupBy,按函数所指定的方法分组。

GroupByKey则是自动按照key分组,把RDD更新为分组之后的数据

Spark的概念和搭建(含代码使用)_spark_22

把value转换为集合list,不然无法直接查看value值,只能看到迭代器等单词。

5,filter过滤器

Lambda结果是true就接收,否就会被过滤掉

Spark的概念和搭建(含代码使用)_python_23


6,distinct用来去重,没有参数


Spark的概念和搭建(含代码使用)_数据_24

Spark的概念和搭建(含代码使用)_python_25

7,并集Union,交集intersection。它们都不会自动去重


Spark的概念和搭建(含代码使用)_数据_26

8,连接操作join


Spark的概念和搭建(含代码使用)_spark_27

9,glom按分区加嵌套。无参数直接rdd.gom

10,sortByKey,按key进行排序(文件夹排序那种,按字母数字大小排序)


Spark的概念和搭建(含代码使用)_python_28

Spark的概念和搭建(含代码使用)_python_29

numP设置一般都为1,表示RDD所有数据都排序,若为分区数,则分区内排序,一般不会出现其他数字了。



Spark的概念和搭建(含代码使用)_spark_30

11,sortBy

(四)spark运行在yarn上

1,使用yarn方式运行,不可以读取本地数据了,只能使用Hadoop中的文件。

2,要加上一句os.environ['HADOOP_CONF_DIR']='/1filerj/export/server/hadoop-3.3.5'

3,把setMaster的参数从local改成yarn即可


Spark的概念和搭建(含代码使用)_python_31

4,如果本py文件中import了工程中的其他py文件

5,local模式也是可以访问到hdfs文件系统的,因为local模式运行的虚拟机可以通过hdfs提供的ip地址和端口找到文件。我们平时只用local模式即可。

(五)action算子。返回值都不是RDD

1,collect。把各个分区中的数据收集到driver中,再以list的形式返回


Spark的概念和搭建(含代码使用)_spark_32

2,countByKey。统计某个key一共出现了几次。

3,rudece按函数指定的方式聚合数据


Spark的概念和搭建(含代码使用)_数据_33

Spark的概念和搭建(含代码使用)_spark_34

4,first和take。前者取rdd中的第一个元素,后者取rdd的前x个元素,以集合形式返回。无视分区,是取RDD中的元素不会管分区


Spark的概念和搭建(含代码使用)_python_35

5,count,计算RDD中一共有几个数据


Spark的概念和搭建(含代码使用)_python_36

6,saveAsTextFile,把RDD中的数据全部写入HDFS的文件中。它是executor直接执行的命令,不会经过driver,所以有几个分区就会产生几个文件。(它是少有的不经过driver 的,其他的action算子一般都经过)

七,广播变量和累加器

(一)广播变量

1,在python代码中,会存在两类变量,一类是本地变量,一类是rdd变量。本地变量存在与driver中(即在终端中),而rdd变量在集群中。如果在一句代码中会同时用到这两个变量,则它们之间需要进行通信。

下图例子,想要把rdd中的学生学号,替换成本地list中的名字。相当于数据库连接,此时两种类型的变量会产生关联。

这种情况下使用的策略是,

Spark的概念和搭建(含代码使用)_spark_37

Spark的概念和搭建(含代码使用)_数据_38

Spark的概念和搭建(含代码使用)_spark_39

driver把本地数据给每一个分区都发送一份。但是这样其实没必要,因为可能一个exe管理着多个分区,driver只需要把数据发给每个exe即可,而不用发给每个分区。

Executor就是进程管理器,而进程间的资源是共享的。







Spark的概念和搭建(含代码使用)_spark_40


2,如果直接把两种数据关联起来,则它们采用的是图1的方法。想要采用图2方法,必须用到广播变量。

Spark的概念和搭建(含代码使用)_数据_41


①rdd将会和哪个本地数据关联,就把那个数据提前放入broadcast中。

②在rdd和本地数据关联时(一般即是在rdd算子的参数函数中使用本地数据),就从broadcast中取value,代码中就不要写list本身,而是用value代替。


Spark的概念和搭建(含代码使用)_spark_42

3,当数据小时,使用广播变量其实比shuffle运行快

(二)累加器

1,没有累加器的情况演示

理想的运行情况,(因为代码中有2处print count)第一处count的值应该从1一直打印到10,而第二处是最终的count,值应该是10.


Spark的概念和搭建(含代码使用)_数据_43

实际结果是,第一处count是从1打印到5,再从1打印到5。而第二处的count值是0.

2,上诉情况原因是,count是本地变量,在rdd的参数函数中使用它时,count只是把自己的值发送给了各个分区【值传递】。各个分区拿到count后,也只在本分区内部打印它的值,所以才是2次1到5。第二处count打印的是本地数据的count,前面的改变都是在rdd中的,本地数据的count一直没变。

3,如果不用累加器,就无法实现本地数据、rdd的两个分区的数据同步。只有使用累加器才能做到数据共享(就像静态变量,一个地方改变了它则所有地方的它都会随之改变)。累加器不仅能累加,把它做静态变量,只要需要同步的地方就用它。


Spark的概念和搭建(含代码使用)_python_44

4,累加器的常见问题

Spark的概念和搭建(含代码使用)_spark_45


Rdd3其实没有进行其他操作,就只是把rdd2重新collect了以下,本来count应该是10啊

发生原因是,在rdd2.collect之后rdd2就会被销毁。Rdd3它后面又要使用rdd2,这使得重新会从头构建rdd2,count的值已经变成10了,再执行一般rdd2导致count有被加一遍,值就会是20。

解决办法是在rdd2.collect前面加一个rdd2.cache,把rdd2存入缓存。

八,spark内核调度

1,一个运行的python程序,在spark中就是一个appllication。每个application都含有一个driver和多个executor。

Driver被创建出来后,它会①负责创建spark的入口地址SparkContext,然后它还要②创建出本python运行文件的DAG图。一个executor就是一个进程,它所需要的资源由WORKER提供(on yarn模式则是Yarn提供)

2,DAG即有向无环图,运行一个python文件时,driver创建好入口地址后,都会根据代码创建出本py文件的DGA。代码中每一个action语句都对应着一个DGA,下图其实是3个DGA图画到一起了。其中每一个DGA图之间都可以并行工作,所以又把叫做一个job。

1个运行着的py文件=1个application

1个actinotallow=1个DGA=1个job

Spark的概念和搭建(含代码使用)_数据_46



Spark的概念和搭建(含代码使用)_python_47

上图只能体现出一个py文件的执行过程,从图中体现不出分区。其实不仅仅是job是并行工作的,一个job还包含了几个分区,各个分区之间也是并行工作的。完整的DGA图,如下。下图只是一个DGA图,而上图中py文件包含了3个DGA

3,宽窄依赖


Spark的概念和搭建(含代码使用)_数据_48

4,一个DGA图还会被划分成几个stage。是以shuffle为界限划分的。这就保证一个stage中都是窄依赖,不存在shuffle。“一个分区的一个stage阶段“就叫做一个”内存迭代管道PIPline“,也就是一个pipLine中不需要通过网络IO取获取别的管道的数据,它只会使用到自己管道内部的数据。

PipLine数目等于stage数目*分区数

每一个管道都交给一个线程,使它们能够并行运行。把它们叫做一个Task。


Spark的概念和搭建(含代码使用)_spark_49

粉色的stage的第一个RDD,它需要等到黄色部分运行完,获取到数据之后才能运行,但是依旧要分线程给它,因为shuffle要保证双方都有线程。

九,SparkSQL

1,RDD可以处理结构化,半结构化,非结构化的数据,而SparkSQL的数据集DataFrame只可以处理结构化数据。即RDD可以处理任意结构的数据,DF只可以处理二维表数据。对于数据RDD可以是数组数组,集合数据,字符串数据等等,而DF只能是二维表。RDD中有多个分区,DF中的数据也是一样,不过DF中每个分区存储的都是表的几行,即依然保持着二维表的这种数据结构

2,DataFrame(类似于二维表)的结构定义。DF被2种数据结构描述。StructType和StructField,前者表示的是“表结构“,后者表示”列结构“。StructField结构的三个参数分别是:列名、列类型、列是否运行为空。

Spark的概念和搭建(含代码使用)_python_50


3,DF数据层面的结构,DF可以获取“行数据“和”列数据“。Row对象例如(1,张三,男,24岁)

Column对象则除了包含一列数据还包括了列的信息(即前面的structField)

十,SparkSQL代码

(一)基本


Spark的概念和搭建(含代码使用)_python_51

1,SparkContext只能作为RDD的入口地址,而SparkSession既可以作为RDD入口地址,也可以作为DF的。

(二)构建出DF

1,DataFrame对象可以从RDD转换而来,都是分布式数据集其实就是转换一下内部存储的结构,转换为二维表结构。

(1)方法1。Rdd数据首先要做一些准备工作,才能被转换成DF,即使用split分开数据,

也可以重新指定数据的类型(行三所示)。最后的构建

Spark的概念和搭建(含代码使用)_数据_52

df方法,参数2是指定数据的列名。


Spark的概念和搭建(含代码使用)_spark_53

(2)方法2.

(3)方式三。使用RDD的toDF方法


Spark的概念和搭建(含代码使用)_python_54


2,读取外部文件构建DF。特别注意,如果指定的format是text则构建的表只会有一列,列名默认


Spark的概念和搭建(含代码使用)_spark_55

Spark的概念和搭建(含代码使用)_spark_56

是value

(三)DF的两种操作方式

1,DataFrame支持两种风格进行编程,分别是:DSL风格,SQL风格。前者即使用DF类中的API进行操作。

(四)DSL的一些API

1,df.show()展示表中的数据,默认20条,参数2指定列最大包含多少字符。

Spark的概念和搭建(含代码使用)_数据_57


2,打印表结构,即structType

df.printSchema()


Spark的概念和搭建(含代码使用)_python_58

5,df.select()输出指定列。在使用它之前要先获取到列对象。


Spark的概念和搭建(含代码使用)_python_59

6,filter和where完全相等,可直接替换。它们用作筛选。

7,groupBy

下图是

Spark的概念和搭建(含代码使用)_spark_60

先分组再聚合,使用了聚合函数count

这个方法的返回值是GroupedData对象。它是一个特殊的DataFrame数据集内部记录了分组之后的DF数据。

8,对数据进行去重

Spark的概念和搭建(含代码使用)_数据_61


9,dropna删除带有null的行

Spark的概念和搭建(含代码使用)_spark_62


10,fillna方法。重新指定null的值

Spark的概念和搭建(含代码使用)_spark_63

11,存储DF数据

Spark的概念和搭建(含代码使用)_python_64


(五)使用SQL方法操纵DF

1,在使用SQL操纵DF之前,要把DF注册为表。临时表只可以在当前的SparkSession中使用,二全局表可以在同一个程序的SparkSession之间使用。使用全局表时要在表名之前加上【global_temp.】

Spark的概念和搭建(含代码使用)_spark_65

2,

Spark的概念和搭建(含代码使用)_python_66


(六)使用pyspark.sql包中的一些方法



(七)sparkSQL操作mysql数据库

1,写入mysql。其中dbtable是一个参数,它右边是写数据表名字

Spark的概念和搭建(含代码使用)_spark_67


2,从mysql中读取数据

Spark的概念和搭建(含代码使用)_数据_68











问题解决,如果出现Python in worker has different version 3.9 than that in driver 3.8, PySpark则在代码中加上import os

os.environ['PYSPARK_PYTHON']='/1filerj/export/server/anaconda3/bin/python'


























【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
SkyPJX5vy5h5
最新推荐 更多

2024-05-31