当用Spark和Hadoop做大数据应用的时候,你可能会反复的问自己怎么解决这一的一个问题:“Container killed by YARN
for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider
boosting spark.yarn.executor.memoryOverhead
”这个错误总会使你的job夭折。它的意思是:因为超出内存限制,集群停掉了container(还没有找到很好的翻译的词)。

   
当我从15T的图像中提取深度学习所得到的特征时,面临内存受到限制的问题,它导致集群会kill掉execitors,尽管这个job可能会运行一天,最终还是会失败。这个数据集合有200000个分区(partitions),我们的spark版本是1.6.2,一般来说,我的脑海中一直有这样一副流程图,它是spark跑任务的流程:





图1 container 内存宏图

   
要做的第一个事情是增加“spark.yarn.executor.memoryOverhead”(增加spark集群中executor的内存),改为4096.为了找到这个可以修改的最大值,我通过增加这个参数的2的次幂,知道集群拒绝我提交这个job。然而,这样做并没有解决问题。

    需要思考的第二个问题是你的数据在各个分区上是否均衡!正如你所见,RDD分布在你的集群上,数据中的每一个slice/piece


/part命名为一个分区。因此分区越多,分区中的数据量就越小。如果我有200000和4的分区,那么每一个分区上包含了50000(200000/4)的图片.

   
通常,从性能上考虑,你想让你的数据平衡。因为每一个并行或者分布式计算的job,希望它们在每一个节点或者线程上分布均匀。举个例子,比如你有四个分区,前三个分区负载20000张图片,最后一个分区复杂了180000张图片,那么前三个分区因为图片数量少,负载较小,跟容易完成这些节点上的工作,然而第四个节点,却又9倍的工作量要处理,显然前三个节点要“等待”第四个节点完成工作,随后才能将结果进行合并。因此如果各个分区上数据不平衡,job进程结束很慢。

    从性能上来说,这种情况很容易导致bug。数据越多意味着需要更多的内存,这将导致内存不够的现象。出发集群kill掉这个container
,意味着job失败。这个意思可以参见:





图2 内存描述和超内存

   
解决内存overhead的问题的方法是:重新分区(repartition)。这样可以保证每一个分区上数据的平衡。但是在实际操作中,并不是那么简单的事情,特别是用python写代码的时候,会出现一些bug,解决讨论方案见http://stackoverflow.com/questions/38799753/how-to-balance-my-data-across-the-partitions,但是scala语言解决这个问题更为鲁棒。其实这是spark版本较低带来的一个bug,将spark升级到2.0之后,能解决下面问题:

16/08/08 20:56:00 ERROR YarnClusterScheduler: Lost executor 68 on
node.of.cluster.com: Container killed by YARN for exceeding memory limits. 12.4
GB of 12 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.
   
另外一个重要的因素是core的数量。在内存中,减少core的个数,使得同时运行的task减少,当前的task越少,内存越充足,从另一个层面减少超内存的问题。当然,这就牺牲了快速计算的性能、cpu的使用效率。但是对于一个频频失败的job而言,能成功运行,牺牲点效率和性能还是可以接受的。

   
另外,分区的数量对你的application来说也是很重要的。正如前面所述,分区越多,每个分区的数据就越少,怎么把握分区的数量并且又能让这个分区处理刚好的数据,而这需要权衡。如果你将当前的数据集分很多分区,那么每一个分区的数据就会很少,大量的分区意味着你会有很多的输出文件。这样做,看似没啥问题,输出文件多点也没啥问题。但是问题是数据源hdfs不得不控制文件空间大小,将压缩hdfs,降低其性能来确保这么多的输出文件正常运行。因此确定一个合适的分区尤为重要。


    回到图1,对于使用python的小伙伴来说,降低“spark.executor.memory”的值对解决内存overhead问题有帮助。

最终,能解决问题的关键点在于:

1.将"spark.yarn.executor.memoryOverhead"设置为最大值,可以考虑一下4096。这个数值一般都是2的次幂。

2.将rdd进行重新分区,这里可以考虑200k。在spark2.3的版本中,rdd成为了dateframe格式的数据。

3.将"spark.executor.cores"从8设置为4。将core的个数调小。

4.将"spark.executor.memory"从8g设置为12g。将内存调大。




小编有话说:


当时我遇见错误的时候,一顿瞎调,我将spark.yarn.executor.memoryOverhead的最大值设置我1500,当时真心不知道这个具体代表什么意义,后知后觉,还是需要多充电,有着急的时间,不如静下来学十分钟,这样得出的解决方案才算靠谱。




翻译链接:

https://gsamaras.wordpress.com/code/memoryoverhead-issue-in-spark/


https://blog.csdn.net/zuolovefu/article/details/79024899