Cloudedera公司提供的的一项服务是帮助客户优化Hadoop集群上MapReduce 任务。由于MapReduce和HDFS是运行用户自行开发程序的复杂集群系统,所以优化并没有绝对的守则,相反的这是一个像医生看病一样的过程,需要对症下药。
所以,这个过程中经验丰富的医生很重要。熟练的开发人员对常见问题甚产生了第六感。在接触了使用各种不同负载,数据集,硬件的Cloudera客户后,我想把这些经验分享给你们。
文章是根据作者经验写成,可能并不适用于某些具体情况。读者应该基于自己的集群进行性能基准测试。作者在一个4节点的集群上用40G的单词计数程序进行了测试优化前后的时间是 8分30秒 和33秒。
窍门 1) 正确配置Hadoop集群症状:
*当所有的MapReduce任务栏位都在运行任务时,用top命令观察到slave节点仍然相对的空闲。
*用top观察到内核进程RAID(mdX_raid*)或pdflush占用大量CPU
*Linux平均负载经常高于系统CPU数x2
*执行任务时,Linux平均负载低于系统CPU数
*节点上超过几MB的SWAP使用量
诊断:
改进MapReduce性能的第一步是确保你的集群配置是优化过的。对于初学者,可以参考��前的文章
http://blog.cloudera.com/blog/2009/03/30/configuration-parameters-what-can-you-just-ignore/
这里是必作更多的优化选项。
确保挂载的DFS和Mapreduce存储设备是挂载在noatime模式下,这样会禁用文件系统记录访问时间,改善IO性能。
避免在TaskTracker和Datanode上使用RAID和LVM,这些会折损IO性能。
尽量配置mapred.local.dir 和 dfs.data.dir指向不同的物理磁盘,以使用所有的IO容量。当集群有任务时,运行sysstat包中的iostat -dx 5可以确认各磁盘的使用状况。
确保磁盘都开启了SMART监控,MapReduce有容错能力,但是故障磁盘会让任务失败,需要重新执行而影响整体性能。如果你发现一个TaskTracker节点进入了多个任务执行的黑名单,它很可能是有了故障磁盘。
用Ganglia或相似的软件检测网络/SWAP使用状况,Ganglia能很好的监控Hadoop集群性能。如果发现节点使用了Swap,从mapred.child.java.opts减少RAM使用量。
性能对比:
这个方法做性能对比需要重装集群,作者略过。
���门 2) 在集群上使用 LZO 压缩插件
症状:
*应用于中间数据LZO压缩始终是个好方法。
*MapReduce 任务输出文件尺寸很大。
*在任务运行时Slave节点上top和iostat中显示高iowait。
诊断:
几乎任何产生大量map输出的MapReduce任务都能从LZO压缩算法受益。虽然LZO增加了一些CPU的负载,但是shuffle阶段减少的大量磁盘IO操作会把时间完全节省回来。
当job要处理大量数据时,LZO压缩也可以增加输出方面的的性能。在默认的3份复制配置下,每1GB压缩省下的空间都相当于节省了3GB的IO写操作。
要开启LZO压缩,请见另一篇文章,
http://blog.cloudera.com/blog/2009/06/parallel-lzo-splittable-compression-for-hadoop/
记得要把mapred.compress.map.output设为true。
性能对比:
禁用LZO只在测试中轻微延长了运行时间。但是文件写出量计数FILE_BYTES_WRITTEN从3.5G增长到9.2G,显示出62%的IO优化效果,在一个job独自运行的环境下,IO并不是瓶颈,所以时间缩短并不明显。当在高任务并发的集群上运行时,60%的IO减少会带来明显的速度提升。
窍门 3) 优化map,reduce任务运行的数量
症状:
每个 map 或 reduce 任务都在30-40秒内结束。
一个大job没有使用上所有集群中的可用槽位。
在大部分mapper和reducer都订好运行计划后,1到2个仍在pending状态直到最后才单独运行。
诊断:
优化map和reduce的任务是非常重要但是经常被忽视,这里介绍几个我常用的相关设置方法:
如果每个任务只执行30-40秒就结束,请减少总的task数量。Task的基本设置和计划本身会消耗几秒钟的时间。所以如果Task执行非常快的话,时间就都浪费在准备Task上了。也可以开启JVM的reuse功能来减少建立task的基本开销。
如果job要处理超过1TB的数据,可以考虑增加输入数据的块Block的大小从256MB到512MB。这样也会减小需要运行的Task数。可以通过如下命令改变数据块大小:
hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata /path/to/inputdata-with-largeblocks.
执行完该命令就可以清除原来的文件了。
在保证每个任务执行都超过30-40秒后,可以增加mapper task为mapper slot(可以执行mapper 机器)的整数倍,如果你有100个可以运行Map任务的节点,尽量不要运行101个Map Task,第101个Map task 会在第一批100个Map任务执行完之后才执行,这点主要针对的是小型集群和小型任务。
不要计划执行太多的Reduce任务,对于大多数任务,我们建议Reduce任务数要等于或小于集群中可运行Reduce任务的节点数。
性能测试:
我使用一个参数-Dmapred.max.split.size=$[16*1024*1024] 来展示设置了过多任务的wordcount程序。这样会产生2640个而不是默认的360个任务来执行该程序。当以这种配置运行时单个的任务平均只用9秒,在JobTracker的监控页面上可以看到正在map任务数在0到24之间波动,整个Job花了17分52秒,是原来配置的2倍。
窍门 4)自己编写一个Combiner组合器
症状:
*当执行数据聚合的任务时,并且Reduce输入的组类别数远小于Reduce输入的记录数时。
*Job需要执行大量的Shuffle工作。(例如:每个Map节点输出若干GB的数据)
*分割过的记录数是从JobCounter里看到的Map输出记录数的几倍。
诊断: 如果你的MapReduce程序是计算记录的聚合值(sum,count,avg...),就需要编写对应的Combiner组合器,组合器会在数据直接进入Reducer前来完成一部分初始的聚合工作。MapReduce框架会智能的调用Combiner减少在Map和Reduce任务之间传输的需要耗费IO资源(磁盘/网络)的数据。
性能测试:
我修改了单词计数程序,为了对比效果,一组移除了调用setCombinerClass的部分,另一组保持原状。修改后平均Map任务的执行时间从33秒变为48秒,任务shuffle的数据从1GB变为1.4GB,整个Jobs的时间从8分30秒变为15分42秒。接近消耗了两倍的时间。该测试是在打开了LZO压缩下做的,如果不开启压缩,影响将会更加明显。
窍门 5)使用最恰当和紧凑的Writable数据类型来存储数据。
症状:
*用Text对象来处理非文本或者复杂数据。
*在大部分输出数据远小于数字类型上限时仍然使用IntWritable和 LongWritable来装载数据。
诊断:
当用户还不熟悉MapReduce编程,或者刚刚从Streaming模式切换到编写JavaMapReduce,他们通常在不必要的地方使用文本Text writable数据类型。虽然使用方便,但是把数字数据转换成UTF-8字符串的效率很低会消耗大量CPU资源。当处理非文本数据,请考虑使用更恰当的数据类型,比如IntWritable, FloatWritable 等。
为了更进一步避免文本类型转换的性能损耗,中间数据存储使用二进制的Writable数据类型会使用更少的空间,从而也起到节省磁盘、网络等集群瓶颈的压力。当处理整数类型时,使用VIntWritable或VLongWritable等变长数据类型在处理小整数数字时会节省更多的空间。比如,整数4会被存储为一个字节,10000会被存为2个字节,这种数据类型在处理大量数据只是小整数数字时更加有效。
如果Hadoop自带数据类型不能满足你的需要,可以考虑编写自定的数据类型,这并不复杂,还会显著节省字符串转换的时间。如果准备这么做,请确保提供对应的RawComparator,例子可以参考原生Writable类型的实现方法。
同样的原理,对于多级的MapReduce Job,即使你的MapReduce最后输出的是TextWritable数据,在中间数据存储时尽量使用二进制格式,比如SequenceFile,仍然可以优化程序对集群资源的占用。
性能测试:
对于单词计数程序,将程序中计数从IntWritable改为Text最为对比。在reducer中做累加时使用Integer.parseString(value.toString())来计算。 修改过的版本比原来的版本要慢10%,整个JOB运行花了9分多钟,每个map任务花了36秒而原来的版本是33秒。因为整型数据的转换相对比较快,这并没有带来明显的改变。在实际的案例中,作者见到过因为使用更有效的数据类型而将性能提升2-3倍的例子。
窍门 6)重用Writables对象
症状:
在集群配置参数mapred.child.java.opts中加入 -verbose:gc -XX:+PrintGCDetails。然后运行一些任务观察系统日志。如果发现Java虚拟机GC垃圾回收频繁,并占用了很多时间,这可能是因为你的程序建立了太多不必要的对象。
在你的代码文件中使用grep搜索 “new Text” 或 “new IntWritable” 。如果你在一个内部循环,或者map/reducer函数中找到类似代码,这个窍门会帮助到你。
这个窍门对于大量占用内存的任务效果更加明显。
第一种MapReducer用户常犯的错误是对每个mapper和reducer都实例化Writable对象,比如可能用如下代码进行单词计数:
public void map(...) {
...
for (String word : words) {
output.collect(new Text(word), new IntWritable(1));
}
}
这个实现方法会产生几千个存活时间非常短的对象。比起让Java的内存垃圾收集来处理这些,不如这样编写这个程序:
class MyMapper ... {
Text wordText = new Text();
IntWritable one = new IntWritable(1);
public void map(...) {
...
for (String word : words) {
wordText.set(word);
output.collect(word, one);
}
}
}
性能测试:
照着上面的修改方法,我发现修改前后对Jobs运行时长并没有影响。这主要是因为测试集群默认配置了1GB堆内存heap size给每个任务,所以不会触发垃圾收集器。但是当设置堆内存为200MB时,过多的对象会明显的拖慢Jobs的运行速度时间从8分30秒增加到了17分钟。原始的版本并没有因为堆内存减小而运行缓慢,修改代码相对比较简单,我建议始终这样编写你的代码,可能这并不会立刻改善所有的Job,但是当内存紧张时就会非常不同了。
窍门 7)使用一些简单的跟踪方法“Poor Man’s Profiling”观察你的任务实际在做什么
这是一个我在观察MapReduce任务性能时一直使用的技巧,专业监控工具的用户可能并不认同这种方法,但是这却取得了很好的效果。
简单的跟踪方法是指,当运行比较缓慢的job时,SSH到一台Slave节点上,然后每次间隔几秒的反复执行5-10次“sudo killall -QUIT java”,不用担心这样并不会退出任何任务。然后使用JobTracker界面观察这个节点运行的一个task的stdout标准输出日志,或者也可以用/var/log/hadoop/userlogs/文件观察正在运行的任务。每次通过前面命令发送的系统SIGQUIT信号都会产生Jvm的栈报错的stack trace输出。
要解读这些输出需要一些经验,这里有一些我常用的方法:
对于输出的日志,快速的检查一下调用的Java包名,如果不包含你自己代码的包名,一般可以直接跳过。
当你找到一条栈报错包含你自己的包名时,快速的在心里回忆一下它是什么功能,比如:“一些执行数字格式的代码”这时先不用考虑具体的代码报错位置(行数)。
用同样的方法处理下一个由于杀进程产生的日志,并做一些记录。
在检查了4-5个日志输出后,你可能已经从中看出一些端倪,��果那些部分正是你认为应该运行很快的代码,你已经找到了问题的关键。比如你做了10次日志输出,其中5次包含NumberFormat的代码,这就意味着你花了50%的CPU时间在处理数字的格式,你可能需要改进相关的代码了。
这当然不是一种真正严谨的探查方法,但是我发现这是一种快速发现CPU瓶颈的好方法,而且不用修改配置。这也是一种练习发现普通任务停止和程序异常任务停止区别的好机会。
下面是我用该方法发现的几个常见的性能问题:
NumberFormat 转换慢– 尽量避免。
String.split字符串分割,文本UTF8编码转换通常比你想象的要慢 – 见上几条窍门尽量避免它们
直接拼接字符串而不是用StringBuffer.append
更多的技巧欢迎提交到Github上
http://github.com/toddlipcon/performance-blog-code/
附录:性能测试集群的配置
每个节点使用的是双4核Nehalem CPU打开超线程功能,24G内存,12x1TB磁盘,TaskTracker设置成6个Map槽位和6个Reduce槽位,设置的比较低是因为我们有时要建立多套Hadoop在同一套硬件上