摘要:本篇文章探讨了大数据分析之深入解析spark2原理—shuffle框架的实现,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。
本篇文章探讨了大数据分析之深入解析spark2原理—shuffle框架的实现,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。
本文分析spark2的shuffle过程的实现的一个概要。
spark2的shuffle过程可以分为shuffle write和shuffle read。shuffle write把map阶段计算完成的数据写入到本地。而shuffle read是从不同的计算节点获取shuffle write计算出来的数据,这样就会发生网络的数据传输和磁盘的i/o。
由于shuffle过程有可能要做以下一些事情:
· 重新进行数据分区
· 数据传输
· 数据压缩
· 磁盘I/O
spark2中的框架主要包括以下几个部分:
l ShuffleManager
这是一个接口,负责管理shuffle相关的组件,比如:通过它来注册shuffle的操作函数,获取writer和reader等。在sparkenv中注册,通过sprkconf进行配置,配置参数是:spark.shuffle.manager,默认是sort,也就是:SortShuffleManager类。在早期的spark版本中,也实现过hashmanager后来全部统一成sort。
l ShuffleReader
在reduce任务中去获取来自多个mapper任务的合并记录数据。实现该接口的类只有一个:BlockStoreShuffleReader。
l ShuffleWriter
在mapper任务中把记录到shuffle系统。这是一个抽象类,实现该抽象类的有:SortShuffleWriter,UnsafeShuffleWriter,BypassMergeSortShuffleWriter三个。
l ShuffleBlockResolver
该接口的实现类需要理解:如何为逻辑的shuffle块标识(map,reduce,shuffle等)获取数据。实现者可以通过文件或文件片段来封装shuffle数据。当获取到shuffle数据时,BlockStore使用它来抽象不同的shuffle实现。该接口的实现类为:IndexShuffleBlockResolver。
1 private[spark] trait ShuffleManager { 2 3 /** 4 * Register a shuffle with the manager and obtain a handle for it to pass to tasks. 5 */ 6 def registerShuffle[K, V, C]( 7 shuffleId: Int, 8 numMaps: Int, 9 dependency: ShuffleDependency[K, V, C]): ShuffleHandle 10 11 /** Get a writer for a given partition. Called on executors by map tasks. */ 12 def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] 13 14 /** 15 * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). 16 * Called on executors by reduce tasks. 17 */ 18 def getReader[K, C]( 19 handle: ShuffleHandle, 20 startPartition: Int, 21 endPartition: Int, 22 context: TaskContext): ShuffleReader[K, C] 23 24 /** 25 * Remove a shuffle's metadata from the ShuffleManager. 26 * @return true if the metadata removed successfully, otherwise false. 27 */ 28 def unregisterShuffle(shuffleId: Int): Boolean 29 30 /** 31 * Return a resolver capable of retrieving shuffle block data based on block coordinates. 32 */ 33 def shuffleBlockResolver: ShuffleBlockResolver 34 35 /** Shut down this ShuffleManager. */ 36 def stop(): Unit 37 }
在spark2.4中实现该接口的类只有一个:SortShuffleManager。下面我们来看一下该类的具体实现。
shuffle的一个不透明句柄(handle),由ShuffleManager用于将有关它的信息传递给任务(task)。
一个获取多个块的迭代器。 对于本地块,它从本地块管理器(local blocks manager)获取。对于远程块,它使用提供的BlockTransferService服务来获取。
它会创建(BlockID,InputStream)元组的迭代器,以便调用者可以在收到块时以流水线方式处理块。
在基于排序的shuffle中,输入的数据会根据其目标分区ID进行排序,然后写入单个map输出文件。 Reducers获取此文件的连续区域,以便读取它们map的输出部分。 如果map输出数据太大而无法放入内存,则可以将输出的已排序数据的一部分写到磁盘,并合并那些磁盘上的文件以生成最终输出文件。
基于排序的shuffle有两个不同的写路径用于生成其映射输出文件:
序列化排序:在满足以下所有三个条件时使用:
nshuffle依赖项指定没有聚合或输出顺序
shuffle序列化程序支持重新定位序列化值(目前这是由KryoSerializer和Spark SQL的自定义序列化程序支持)。
shuffle产生的输出分区少于16777216。
反序列化排序:用于处理所有其他情况。
在序列化排序模式下,输入记录会在它被传递给shuffler writer时被序列化,并在排序期间以序列化形式进行缓冲。 该写入模式实现了几个优化:
· 它的排序操作是序列化的二进制数据而不是Java对象,这样可以减少内存消耗和GC开销。 此优化要求记录序列化程序具有某些属性,以允许重新排序序列化记录,而无需反序列化。 有关详细信息,请参见SPARK-4550,其中首次提出并实施了此优化。
· 它使用专门的缓存高效排序器([[ShuffleExternalSorter]])来排序压缩记录指针和分区ID的数组。 通过在排序数组中每个记录仅使用8个字节的空间,这样就可以把更多的数组放到内存中。
· 溢出合并过程对属于同一分区的序列化记录块进行操作,并且在合并期间不需要对记录进行反序列化。
· 当溢出压缩编解码器支持压缩数据的连接时,溢出合并只是简单地连接序列化和压缩的溢出分区以生成最终输出分区。 这允许使用高效的数据复制方法,如NIO的transferTo,并避免在合并期间分配解压缩或复制缓冲区的需要。
该接口有三个实现类:
· BypassMergeSortShuffleWriter
· SortShuffleWriter
· UnsafeShuffleWriter
该抽象类的实现代码如下:
1 private[spark] abstract class ShuffleWriter[K, V] { 2 /** Write a sequence of records to this task's output */ 3 @throws[IOException] 4 def write(records: Iterator[Product2[K, V]]): Unit 5 6 /** Close this writer, passing along whether the map completed */ 7 def stop(success: Boolean): Option[MapStatus] 8 }
在mapper任务中把记录到shuffle系统。在该类中,写出数据时会使用外部排序对数据进行排序。
该抽象类实现:在reduce任务中获取以从映射器中读取组合记录。目前只有一个实现类:BlockStoreShuffleReader。
1 private[spark] trait ShuffleReader[K, C] { 2 /** Read the combined key-values for this reduce task */ 3 def read(): Iterator[Product2[K, C]] 4 5 /** 6 * Close this reader. 7 * TODO: Add this back when we make the ShuffleReader a developer API that others can implement 8 * (at which point this will likely be necessary). 9 */ 10 // def stop(): Unit 11 }
该类实现了ShuffleReader抽象类,其实就只是实现了一个read函数,该函数返回一个迭代器。
该实现类的主要功能是:当其他节点的块存储(block store)请求分区时,将会读取一个shuffle过程中的多个分区,分区的范围是[startPartition,endPartition]。
讲述了spark2的shuffle框架的实现概要分析。主要分析了spark2中的shuffle框架的实现接口和相关实现类的大概实现逻辑。接下来的文章会对shuffle框架的三个部分的详细实现进行分析。
本文由职坐标整理发布,学习更多的相关知识,请关注职坐标IT知识库!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号