第一章 大数据处理框架概览
大数据处理框架的四层结构
用户层
- 输入数据
- 用户代码
- 配置参数
分布式数据并行处理层
先将用户提交的应用转化为较小的计算任务,然后通过调用底层的资源管理与任务调度层实现并行执行
Spark上应用的转化过程包含两层:逻辑处理流程、执行阶段与执行任务划分
- Spark首先根据用户代码中的数据操作语义和操作顺序,将代码转化为逻辑处理流程,逻辑处理流程是一个DAG图,节点是数据单元RDD
- 框架对逻辑处理流程进行划分,生成物理执行计划,该计划包含多个stage,每个stage包含多个task
资源管理与任务调度层
大数据处理框架一般是主从结构,主节点负责接收用户提交的应用,处理请求,管理应用运行的整个生命周期。从节点负责执行具体的数据处理任务(task),并在运行过程中向主节点汇报任务的执行状态。
Spark支持不同的部署模式,如Local、Standalone、YARN、Mesos等
Standalone与MR部署模式基本类似,唯一的区别是MR部署模式为每个task启动一个JVM进程运行,而且是在task将要运行时启动JVM,而Spark是预先启动资源容器(ExecutorJVM),然后当需要执行task时,再在ExecutorJVM里启动task线程运行。
这就解释了yarn上看到的现象:在提交MR任务的时候,占用的资源是不断变化的,提交spark任务资源是固定的
在MR中,每个task以一个Java进程(ChildJVM)方式运行,好处是task之间相互独立,每个task独享进程资源,监控方便,坏处是task之间不方便共享资源,在执行过程中需要不断启停新旧task,会降低执行效率。
而Spark采用了以线程为最小的执行单位,缺点是线程间会有资源竞争,而且ExecutorJVM的日志会包含多个并行task的日志,较为混乱
Standalone模式适合使用在数据规模不大,集群所部署大数据计算框架单一,整个集群计算资源可以全部提供给spark计算,无需考虑资源分配的问题。
Spark on yarn在国内使用比较广泛。 它具有以下优势:
- 避免搭建spark集群。spark只需要部署在一个节点上,通过该节点,将应用提交给yarn集群处理。在拥有yarn集群的情况下,无需搭建spark集群就可以方便的运行spark应用
- 统一资源管理和调度。yarn是包括mr, spark在内的多种大数据计算引擎的资源管理和调度的实现方式,yarn具有资源隔离机制,使得各种大数据计算任务可以稳定的运行在集群中,而不会产生资源竞争问题。
- 资源弹性管理。yarn通过队列的方式,根据不同类型应用程序的资源压力,弹性调度资源的使用量
物理执行层
物理执行层负责启动task,执行每个task的数据处理步骤。
- 首先确定应用会产生哪些job(根据action操作)
- 其次根据逻辑处理流程中的数据依赖关系,将每个job的处理流程拆分为stage
- 最后对于每一个stage,根据RDD的分区个数确定执行的task个数和种类
task执行过程中主要消耗内存的数据分为以下3类:
- 框架执行时的中间数据。例如map()输出到缓冲区的数据和reduce task在Shuffle阶段暂存到内存中的数据。
- 框架缓存数据。例如用户调用cache()接口缓存到内存中的数据。
- 用户代码产生的中间计算结果。例如用户调用map()、reduce()、combine(),在处理输入数据时会在内存中产生中间计算结果。
为什么要拆分stage?
- stage中的生成的task不会太大,也不会太小,而且是同构的,便于并行执行
- 可以将多个操作放在一个task里处理,使得操作可以进行串行、流水线式的处理,提高效率
- stage可以方便错误容忍,如果一个stage失败,可以重新运行这个stage,而不需要运行整个job
第二章 Spark系统部署与应用运行的基本流程
第三章 Spark逻辑处理流程
Spark逻辑处理概览
逻辑处理流程主要包含四部分:
(1)数据源
(2)数据模型
RDD与普通数据结构(ArrayList)的主要区别:
- RDD只是逻辑概念,在内存中不会真正为某个RDD分配存储空间(除非被缓存)。RDD中的数据只会在计算中产生,计算完成后就会消失,而ArrayList等数据结构常驻内存
- RDD可以包含多个数据分区,不同数据分区可以由不同的task在不同节点进行处理
(3)数据操作 transformation()操作和action()操作
rdd使用transformation后会生成新的rdd,而不能对rdd本身进行修改
在Spark中,因为数据操作一般是单向操作,通过流水线执行,还需要进行错误容忍等,所以RDD被设计成一个不可变类型,可以类比成一个不能修改其中元素的ArrayList。
(4)计算处理结果
直接存放到分布式系统 或者 在Driver端进行集中计算
Spark逻辑处理流程生成方法
宽窄依赖
窄依赖(NarrowDependency):新生成的childRDD中每个分区都依赖parentRDD中的一部分分区
宽依赖(ShuffleDependency):新生成的childRDD中的分区依赖parentRDD中的每个分区的一部分
如果parentRDD的一个或者多个分区中的数据需要全部流入childRDD的某一个或者多个分区,则是窄依赖。
如果parentRDD分区中的数据需要一部分流入childRDD的某一个分区,另外一部分流入childRDD的另外分区,则是宽依赖。
网上常见的解释是错误的,因为ManyToManyDependency是窄依赖,例如Cartesian(笛卡尔积)
groupByKey()一定产生shuffle吗?
不一定。
右图中rdd1已经提前使用Hash划分进行了分区,具有相同Hash值的数据已经在同一个分区,而且设定的groupByKey()生产的RDD的分区个数与rdd1一致,此时不需要shuffle。
如何对RDD内部的数据进行分区
(1)水平划分
根据record的索引进行划分。
例如sparkContext.parallelize(list(1,2,3,4,5,6,7,8,9),3),就是按照元素的下标划分,123为一组,456为一组,789为一组。
使用Spark处理HDFS文件时,HDFS自动对数据进行水平划分,也就是按照128M为单位将输入数据划分为很多个小数据块(block),每个spark task可以只处理一个数据块。
(2)Hash划分(HashPartitioner)
使用record的Hash值来对数据进行划分,好处是只需要知道分区个数,就能将数据确定性的划分到某个分区。
该划分方法经常被用于数据Shuffle阶段。
(3)Range划分(RangePartitioner)
该方法一般适用于排序任务,按照元素的大小关系将其划分到不同分区,每个分区表示一个数据
Spark Shuffle解析
Shuffle的核心要点
ShuffleMapStage与ResultStage
在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。
ShuffleMapStage的结束伴随着shuffle文件的写磁盘。
ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束。
HashShuffle解析
未优化的HashShuffle
这里我们先明确一个假设前提:每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。
如下图中有3个 Reducer,从Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。
优化后的HashShuffle
优化的HashShuffle过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
这里还是有4个Tasks,数据类别还是分成3种类型,因为Hash算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个Mapper Tasks,所以总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。
SortShuffle解析
普通SortShuffle
在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一边写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。
最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
bypass SortShuffle
bypass运行机制的触发条件如下:
1) shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。
2) 不是聚合类的shuffle算子(比如reduceByKey)。
此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
Spark为什么比Hive快
Spark是内存计算,Hadoop其实也是内存计算。Spark和Hadoop的根本差异是多个任务之间的数据通信问题:Spark多个任务之间数据通信是基于内存,而Hadoop是基于磁盘。
(1)消除了冗余的HDFS读写
Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互。
(2)Spark的缓存机制比HDFS的缓存机制高效。
(3)JVM的优化
Hadoop每次MapReduce操作,启动一个Task便会启动一次JVM,基于进程的操作。而Spark每次MapReduce操作是基于线程的,只在启动Executor时启动一次JVM,内存的Task操作是在线程复用的。
第四章 Spark物理执行计划
Spark物理执行计划生成方法
执行步骤
(1)根据action()操作顺序将应用划分为job
(2)根据ShuffleDependency依赖关系将job划分为stage
从后往前回溯,遇到ShuffleDependency时停止,划分stage
(3)根据分区计算将各个stage划分为task
根据每个stage最后一个RDD的分区个数决定生成task的个数
相关问题
(1)job、stage、task的计算顺序
当应用程序执行到rdd.action()时,就会立即将job提交给Spark。
每个stage的输入数据要么是job的输入数据,要么是上游stage的输出结果。
因此,计算顺序从包含输入数据的stage开始,从前到后依次执行,仅当上游的stage都执行完成后,再执行下游的stage。上图中stage0和stage1都包含了job的输入数据,两者可以先开始计算,仅当两者都完成后,stage2才开始计算。
stage中的每个task是独立而且同构的,可以并行运行
(2)task内部数据的存储与计算问题(流水线计算)
第一种模式
每次读取一个record,计算并生成一个新record。
计算record1时并不需要知道record2是什么,这种情况可以采用”流水线“式计算。 在计算时还需要在内存中保留当前被处理的单个record即可,不需要保存其他record或者已经被处理完的record。有效减少内存使用空间。
第二种模式
f()函数每生成一条数据,都进入g()函数的iter.next()中进行计算,g()函数需要在内存中保存这些中间计算结果,并在输出时将中间结果依次输出。
有些不用保存中间结果,比如求max,只保存当前最大的即可。
第三种模式
先执行f()函数,完成后再计算g()函数。
f()函数的输出结果需要保存在内存中,而g()函数计算完每个record’并得到record’’后,可以对record’进行回收。
第四种模式
由”流水线“式退化到“计算——回收”模式,每执行完一个操作,回收之前的中间计算结果。
(3)task之间的数据传递与计算问题
ShuffleDependency的数据划分方法包括Hash划分、Range划分等,也就是要求上游stage预先将输出数据进行划分,按照分区存放,分区个数与下游task的个数一致,这个过程被称为“Shuffle Write”。
按照分区存放完成后,下游的task将属于自己分区的数据通过网络传输获取,然后将来自上游不同分区的数据聚合在一起进行处理,这个过程被称为“Shuffle Read”。
第五章 迭代型Spark应用
第六章 Shuffle机制
Shuffle的设计思想
解决数据分区和数据聚合问题
(1)数据分区问题(Shuffle Write阶段)
1.如何确定分区个数?
分区个数可以由用户自定义,一般被定义为集群中可用CPU个数的1~2倍。
如果用户没有定义,则默认分区个数是parent RDD的分区个数的最大值。
2.如何对map task输出数据进行分区?
map task每输出一个<K,V>record,根据K计算partitionId,具有不同partitionId的record被输出到不同的分区(文件)中。
(2)数据聚合问题(Shuffle Read阶段)
两步聚合:
先将record存放到HashMap中,再使用func()函数对list(V)进行计算
缺点:所有Shuffle的record都会先被存放到HashMap中,占用内存空间较大。
优化方案——在线聚合:
在每个record加入HashMap时,同时进行func()聚合操作,并更新相应的聚合结果。
解决map()端combine问题
需要进行combine操作:
combine的目的是减少Shuffle数据量,只有包含聚合函数的操作需要进行map端combine,如reduceByKey()、foldByKey()、aggregateByKey()等。
解决方案:
combine和Shuffle Read端的聚合过程没有区别,都是将<K,V>record聚合成<K,func(list(V))>,不同的是,Shuffle Read是来自所有map task输出的数据,而combine是来自单一task的数据。
因此仍然先利用HashMap进行combine,然后对HashMap中每一个record进行分区,输出到对应的分区文件中。
解决sort问题
何时进行排序,即如何确定排序和聚合的顺序?
方案1:先排序再聚合
MR采用的方案——先使用Array,存储Shuffle Read的<K,V>record,然后对Key进行排序,排序后的数据可以直接从前到后进行扫描聚合。
缺点:需要较大内存空间存储线性数据结构,排序和聚合不能同时进行,效率较低。
方案2:排序和聚合同时进行
使用TreeMap
缺点:TreeMap的排序复杂度高,不适合数据规模非常大的情况
方案3:先聚合再排序
Spark采用的方案——维持现有基于HashMap的聚合方案,将HashMap中的record或record的引用放入线性数据结构中进行排序。之前的在线聚合方案不需要改动。
缺点:需要复制数据或引用,空间占用较大。
解决内存不足问题
内存+磁盘混合存储。
如果内存不足,将内存中的数据spill到磁盘。在进行下一步数据操作之前对磁盘上和内存中的数据进行再次聚合(全局聚合)。为了加速聚合,需要将数据spill到磁盘上时进行排序,这样全局聚合才能够按顺序读取spill到磁盘上的数据,减少磁盘I/O。
Spark中Shuffle框架的设计
Shuffle Write框架设计和实现
框架计算顺序:map()输出 -> 数据聚合 -> 排序 -> 分区
map task每计算出一个record及其partitionId,就将record放入类似HashMap中进行聚合,聚合完成后再将数据放入类似Array中进行排序,既可按照partitionId,也可以按照partitionId+Key进行排序,最后根据partitionId将数据写入不同的数据分区中,存放到本地磁盘。
Shuffle Read框架设计和实现
框架计算顺序:数据获取 -> 聚合 -> 排序
reduce task不断从各个map task的分区文件中获取数据,然后使用类似HashMap的结构对数据进行聚合,边获取边聚合。聚合完成后放入类似Array中按照Key进行排序,最后将排序结果输出或者传递给下一个操作。
与MapReduce的Shuffle机制对比
MR Shuffle:
两个明显的阶段:map stage和reduce stage。
在map stage中,每个map task首先执行map(K,V)函数,再读取每个record,并输出新的<K,V> record。这些record被输出到一个固定大小的spill buffer里(一般100MB),spill buffer如果被填满就将spill buffer中的record按照Key排序后输出到磁盘上。
这个过程类似Spark将map task输出的record放到一个排序数组(PartitionedPairBuffer)中,不同的是MR是严格按照Key进行排序的,而PartitionedPairBuffer排序更灵活(可以按照partitionId进行排序,也可以按照partitionId+Key排序)。
由于spill buffer中的record只进行排序,不能聚合,所以MR在完成map()、等待所有record都spill到磁盘后,启动一个专门的聚合阶段,使用combine将所有spill文件中的record进行全局聚合,得到最终聚合结果。
注意,这里需要进行多次全局聚合,因为每次只针对某个分区的spill文件进行聚合。
在Shuffle Read阶段,MR先将每个map task输出的相应分区文件通过网络获取,然后放入内存,如果内存放不下,就先对当前内存中的record进行聚合和排序,再spill到磁盘。等获取所有的分区文件时,可能存在多个spill文件及内存中剩余的分区文件,这时再启动一个专门的reduce阶段来将这些内存和磁盘上的数据进行全局聚合。
这个过程和Spark的全局聚合过程没有什么区别。
MR Shuffle优点:
1.MR的Shuffle流程固定,每个阶段读取什么数据、进行什么操作、输出什么数据都是确定性的。实现起来更容易。
2.MR的内存消耗是确定的,map阶段只需要一个大的spill buffer,reduce阶段只需要一个大的数组(MergeQueue)。这样,什么时候将数据spill到磁盘上是确定的,易于实现和内存管理。用户定义的聚合函数如combine和reduce的内存消耗是不确定的。
3.MR对Key进行了严格排序,使得可以使用最小堆或最大堆进行聚合,非常高效。
4.MR按Key进行排序和spill到磁盘的功能,可以在Shuffle大规模数据时仍然保证顺利进行。
MR Shuffle缺点:
1.强制按Key排序,大多数应用不需要,如groupByKey,排序增加计算量。
2.不能在线聚合,不管是map端还是reduce端,都实现将数据放到内存或者磁盘上后再执行聚合,存储这些数据需要消耗大量内存和磁盘。
3.产生的临时文件过多,如果map task个数为M,reduce task个数为N,map阶段集群会产生M×N个分区文件。
Spark如何克服这些缺点
对于强制排序,Spark提供了按partitionId排序,按Key排序等多种方式
对于不能在线聚合,Spark采用hash-based聚合,也就是利用HashMap的在线聚合特性。
对于临时文件过多,Spark是将多个分区文件合并为一个文件,按照partitionId的顺序存储。
第七章 数据缓存机制
与MapReduce的缓存机制对比
MR的缓存机制DistributedCache不是用于存放job运行的中间结果,而是用于缓存job运行所需的文件,如所需的jar文件、每个map task需要读取的辅助文件、一些文本文件等。
DistributeCache将缓存文件放在每个Worker的本地磁盘上,并不是内存中。
Spark job一般包含多个操作,按照DAG图方式执行,也适用于迭代型应用,因此会产生大量中间数据和可复用的数据,Spark为这些数据设计了基于内存和磁盘的缓存机制,可以更好地加速应用执行。
缺陷:缓存的RDD数据是只读的,不能修改;
当前的缓存机制不能根据RDD的生命周期进行自动缓存替换。
缓存数据只能job间共享,应用之间不能共享。
第八章 错误容忍机制
重新计算机制
重新执行失效的task时,是否还需要执行其上游stage中的task?
Spark采用了“延时删除策略”——将上游stage的Shuffle Write的结果写入本地磁盘,只有在当前job完成后,才删除Shuffle Write写入磁盘的数据。即使某个task执行失败,也可以再次通过Shuffle Read读取到同样的数据。
所以Spark根据ShuffleDependency切分出的stage既保证了task的独立性,也方便了错误容忍的重新计算。
Spark采用了一种被称为lineage(计算链)的数据溯源方法。
核心思想是在每个RDD中记录其上游数据是什么,以及当前RDD是如何通过上游数据(parentRDD)计算得到的。
如果计算链上有缓存数据,从缓存数据处截断计算链,即可得到简化后操作。
checkpoint机制的设计与实现
checkpoint时机及计算顺序
如果像缓存一样,每计算出一个record,就将其缓存到内存或磁盘中,效率很低,因为checkpoint将数据持久化到HDFS时需要写入磁盘,而且一般是3份跨节点存储,且写入时延时高。同时,后续操作需要从HDFS中读取数据,读取代价也很高。
Spark的方案是:用户设置rdd.checkpoint()后只标记某个RDD需要持久化,计算过程像正常一样计算,等到当前job计算结束时再重新启动该job计算一遍,对其中需要checkpoint的RDD进行持久化。需要checkpoint的RDD会被计算两次。
Spark推荐用户将需要checkpoint的数据先进行缓存,这样额外启动的任务只需要将缓存数据进行checkpoint即可,不需要再重新计算RDD。
checkpoint的写入过程不仅对RDD进行持久化,而且会切断该RDD的lineage。
cache + checkpoint
cache并不能切断lineage,因为缓存的数据并不可靠,一旦丢失,还需要根据lineage重新计算。
如果既缓存也进行checkpoint,应用先对数据进行缓存,然后checkpoint,读取时读取缓存数据而非checkpoint数据。
cache与checkpoint的区别
(1)目的不同
缓存的目的是加速计算,即加速后续运行的job;
checkpoint的目的是在job运行失败后能够快速恢复,即加速当前需要重新运行的job。
(2)存储性质和位置不同
缓存是为了读写速度快,主要使用内存,偶尔使用磁盘;
checkpoint为了可靠读写,因此使用HDFS。
(3)写入速度和规则不同
缓存速度快,对job执行时间影响小,因此可以在job运行时进行缓存;
checkpoint写入慢,为了减少对当前job的影响,会额外启动专门的job进行持久化。
(4)对lineage的影响不同
checkpoint会切断lineage
(5)应用场景不同
缓存适用于会被多次读取、占用空间不是非常大的RDD;
checkpoint适用于数据依赖关系比较复杂、重新计算代价较高的RDD。
第九章 内存管理机制
应用内存消耗来源及影响因素
来源1:用户代码
map、reduceByKey这种每读入一条数据立即调用func进行处理并输出结果,中间计算结果不进行存储,内存消耗可以忽略不计。
如果用户在算子操作中定义了数组,并在数组中存放中间计算结果,这些中间计算结果会造成内存消耗。
来源2:Shuffle机制中产生的中间数据
Shuffle Write阶段:
如果需要进行combine聚合,Spark会将record存放到类似HashMap中进行聚合,HashMap会占用大量内存空间。
最后,Spark会按照partitionId或者Key对record进行排序,这个过程可能会使用数组保存record,也会消耗一定的内存。
Shuffle Read阶段:
该阶段将来自不同map task的分区数据进行聚合、排序,得到结果后进行下一步计算。
首先分配一个缓冲区暂存从不同map task获取的record,这个buffer需要消耗一些内存。
然后,如果需要聚合,将采用类似HashMap的结构,会占用大量内存。
最后,如果需要对key排序,可能会建立数组,消耗一定的内存。
来源3:缓存数据
Spark框架内存管理模型
静态内存管理模型
Spark1.6之前将内存划分为3个分区,每个分区负责存储内存消耗来源的一种。
(1)数据缓存空间:约占60%,用于存储RDD缓存数据、广播数据、task的一些计算结果等。
(2)框架执行空间:约占20%,用于存储Shuffle中间数据。
(3)用户代码空间:约占20%,用于存储代码的中间计算结果、Spark框架本身产生的内部对象,以及ExecutorJVM自身的一些内存对象等。
缺点:分区之间存在“硬”界限,难以平衡三者的内存消耗。
统一内存管理模型
Spark1.6开始,依然将内存划分为3个分区,但统一内存管理模型使用“软”界限来调整分区的占比。
该内存模型可以使用ExecutorJVM的堆内内存和堆外内存。
ExecutorJVM的整个内存空间划分为以下3个部分。
(1)系统保留内存(Reserved Memory)
使用较小的空间存储Spark框架产生的内部对象(如Spark Executor对象,TaskMemoryManager对象等Spark内部对象),通过spark.testing.ReservedMemory默认设置为300M。
(2)用户代码空间(User Memory)
用于存储用户代码生成的对象,约占40%的内存。
(3)框架内存空间(Framework Memory)
包括框架执行空间和数据缓存空间。约占60%的内存。
两者共享这个空间,其中一方空间不足时可以动态向另一方借用。
Framework Memory的堆外内存空间:
为了减少GC开销,统一内存管理机制也允许使用堆外内存,该空间不受JVM垃圾回收机制管理,在结束使用时需要手动释放。
堆外内存只用于框架执行空间和数据缓存空间,而不用于用户代码空间。
堆外内存的管理方式和功能与堆内内存的Framework Memory一样。
在运行应用时,Spark会根据应用的Shuffle方式及用户设定的数据缓存级别来决定使用堆内内存还是堆外内存。
Spark框架执行内存消耗与管理
Shuffle Write
Shuffle Read
内存管理方法总结
(1)内存消耗来源多种多样,难以统一管理
内存消耗3个方面:Shuffle数据、数据缓存、用户代码。
Spark采用统一内存管理模型,通过“硬界限+软界限”的方法限制每个区域的内存消耗,并通过内存共享达到平衡。
如何解决内存空间不足的问题——框架执行空间或者数据缓存空间不足时可以向对方借用,如果还不够,则会采取spill到磁盘、缓存数据替换、丢弃等方法。
(2)内存消耗动态变化难以预估,为内存分配和回收带来困难
Spark采取边监控边调整,如通过监控Shuffle Write/Read过程中数据结构大小来观察是否达到框架执行空间界限、监控缓存数据大小观察是否到达数据缓存空间界限,如果达到界限,则进行数据结构扩容、空间借用或者将缓存数据移出内存。
(3)task之间共享内存,导致内存竞争
在Spark的统一内存管理模型中,框架执行空间和数据缓存空间都是有并行运行的task共享的,为了平衡task间的内存消耗,Spark采取均分的方法限制每个task的最大使用空间(1/N*ExecutorMemory),同时保证task的最小使用空间(1/2N*ExecutorMemory)。
内存管理优化
(1)堆内内存和堆外内存管理问题
Spark主要利用堆内内存来进行数据Shuffle和数据缓存,内存消耗高、GC开销大。
虽然部分Shuffle方式可以利用堆外内存,但主要适用于无聚合、无排序的场景,而且需要用户自己设定堆外内存大小。
使用堆外内存虽然可以降低GC开销,但也有弊端,如应用场景受限,也容易出现内存泄漏等问题。
所以,Spark还需要提高堆内内存和堆外内存的利用能力,降低用户负担,提高内存利用率。
(2)存储与计算分离问题
当前,Spark的统一内存管理模型将数据缓存和数据计算都放在一个内存空间中进行,会产生内存竞争问题。
(3)更高效的Shuffle方式
针对RDD操作,Spark目前只提供了Serialized Shuffle Write方式,没有提供Serialized Shuffle Read方式。