大数据开发学习,是每一个优秀的程序员进入大数据行业的必修课程。大数据技术人才的诞生并不是一蹴而就的,需要经历好几轮的打磨和实践,才能成就为合格的大数据工程师。而恰好千锋教育大数据培训为你成功的打开了通往大数据的一扇门,精准分析,精细讲解,助你一臂之力。下面是千锋大数据开发培训班的讲师分享给大家的一个技术点——“100TB的挑战及优化”
成功运行TPC-DS 100 TB数据集中的所有SQL,对于Apache Spark来说也是一大挑战。虽然SparkSQL官方表示支持TPC-DS所有的SQL,但这是基于小数据集。在100TB这个量级上,Spark暴露出了一些问题导致有些SQL执行效率不高,甚至无法顺利执行。在做实验的过程中,我们在自适应执行框架的基础上,对Spark也做了其它的优化改进,来确保所有SQL在100TB数据集上可以成功运行。以下是一些典型的问题。
统计map端输出数据时driver单点瓶颈的优化(SPARK-22537)
在每个map任务结束后,会有一个表示每个partition大小的数据结构(即下面提到的CompressedMapStatus或HighlyCompressedMapStatus)返回给driver。而在自适应执行中,当一次shuffle的map stage结束后,driver会聚合每个mapper给出的partition大小信息,得到在各个partition上所有mapper输出的数据总大小。
该统计由单线程完成,如果mapper的数量是M,shuffle partition的数量为S,那么统计的时间复杂度在O(M x S) ~ O (M x S x log(M x S)) 之间,当CompressedMapStatus被使用时,复杂度为这个区间的下限,当HighlyCompressedMapStatus被使用时,空间有所节省,时间会更长,在几乎所有的partition数据都为空时,复杂度会接近该区间的上限。
在M x S增大时,我们会遇到driver上的单点瓶颈,一个明显的表现是UI上map stage和reduce stage之间的停顿。为了解决这个单点瓶颈,我们将任务尽量均匀地划分给多个线程,线程之间不相交地为scala Array中的不同元素赋聚合值。
在这项优化中,新的spark.shuffle.mapOutput.parallelAggregationThreshold(简称threshold)被引入,用于配置使用多线程聚合的阈值,聚合的并行度由JVM中可用core数和M * S / threshold + 1中的小值决定。
Shuffle读取连续partition时的优化 (SPARK-9853)
在自适应执行的模式下,一个reducer可能会从一个mapoutput文件中读取诺干个连续的数据块。目前的实现中,它需要拆分成许多独立的getBlockData调用,每次调用分别从硬盘读取一小块数据,这样就需要很多的磁盘IO。我们对这样的场景做了优化,使得Spark可以一次性地把这些连续数据块都读上来,这样就大大减少了磁盘的IO。在小的基准测试程序中,我们发现shuffle read的性能可以提升3倍。
Broadcast HashJoin中避免不必要的partition读的优化
自适应执行可以为现有的operator提供更多优化的可能。在SortMergeJoin中有一个基本的设计:每个reducetask会先读取左表中的记录,如果左表的 partition为空,则右表中的数据我们无需关注(对于非anti join的情况),这样的设计在左表有一些partition为空时可以节省不必要的右表读取,在SortMergeJoin中这样的实现很自然
BroadcastHashJoin中不存在按照join key分区的过程,所以缺失了这项优化。然而在自适应执行的一些情况中,利用stage间的精确统计信息,我们可以找回这项优化:如果SortMergeJoin在运行时被转换成了BroadcastHashJoin,且我们能得到各个partition key对应partition的精确大小,则新转换成的BroadcastHashJoin将被告知:无需去读那些小表中为空的partition,因为不会join出任何结果。