[转帖]hadoop之CombineFileInputFormat篇_Hadoop,ERP及大数据讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  Hadoop,ERP及大数据讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 3517 | 回复: 0   主题: [转帖]hadoop之CombineFileInputFormat篇        下一篇 
刘伟
注册用户
等级:少校
经验:938
发帖:82
精华:0
注册:2013-6-24
状态:离线
发送短消息息给刘伟 加好友    发送短消息息给刘伟 发消息
发表于: IP:您无权察看 2013-7-1 14:02:43 | [全部帖] [楼主帖] 楼主

简介

本文主要介绍下面4个方面

1.为什么要使用CombineFileInputFormat

2.CombineFileInputFormat实现原理

3.怎样使用CombineFileInputFormat

4.现存的问题

使用CombineFileInputFormat的目的

在开发MR的程序时,mapper的主要作用是对数据的收集。一般情况下,为了能让mapper更快的运行,我们会对文件进行split,以便多个mapper同时运行。在这种情况下,为了让程序更好更快的运行,我们需要控制mapper的个数。Mapper的个数主要由文件的大小及我们所设置的mapred.min.split.size以及blockSize所决定(详细参考:http://ai-longyu.iteye.com/blog/1566633)

上面所说的在我们使用TextInputFormat和分析单个文件时是没有问题的,基本上mapper的个数能够控制在我们所预期的范围内。但是当我们使用多个文件作为input的时候,mapper的个数就不再是我们所期望的那样了,因为TextInputFormat继承的是FileInputFormat,而FileInputFormat的split操作是只针对单个文件,对于多个文件,是将每个文件进行split,而不能做一些合并的操作(尤其是大量的小文件)。

你会想为什么不能进行合并呢,有没有实现合并的split呢?在这个时候,CombineFileInputFormat就闪亮登场了。这里所说的CombineFileInputFormat是由官方提供的,只要我们搞清楚了官方是怎么实现的,就能够自己也实现一个了。接下来将逐步分析CombineFileInputFormat的实现了。

CombineFileInputFormat实现步骤

这里插一句,官方的CombineFileInputFormat并不是线程安全的。

先申明一下,这里分析所采用的源码是apache的1.0.3,分析的在org.apache.hadoop.mapred.lib.CombineFileInputFormat而不是org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat,这里分析的旧API,而没有分析新的API

生成split的信息是由

public InputSplit[] getSplits(JobConf job, int numSplits)


Job参数:job的配置信息

numSplits参数:期望的mapper数目,在这里根本就没有使用

//每个DN的最小split大小 
long minSizeNode = 0;
//同机架的最小split大小 
long minSizeRack = 0;
//最大的split大小 
long maxSize = 0;


这几个变量都可以从job的配置信息中获取

接下来就是获取input的路径列表,判断每个路径时候被Filter所允许,然后对允许的路径列表生成split信息列表,进入该类的核心方法

/** 

     * Return all the splits in the specified set of paths 

     *  

* @param job Job的配置信息 

* @param paths 输入源的路径列表 

* @param maxSize 最大的split大小 

* @param minSizeNode 每个DN最小的split大小 

* @param minSizeRack 每个rack最小的split大小 

* @param splits split信息列表 

* @throws IOException 

*/
private void getMoreSplits(JobConf job, Path[] paths,
long maxSize, long minSizeNode, long minSizeRack,
List<CombineFileSplit> splits)


生成每个文件的OneFileInfo对象

// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
      //构建每个input文件的信息,并将文件中的每个 
      //block信息收集到rackToBlocks、blockToNodes、nodeToBlocks中 
      files[i] = new OneFileInfo(paths[i], job,
      rackToBlocks, blockToNodes, nodeToBlocks);
      //增加所有文件的大小 
      totLength += files[i].getLength();
}


在下面就开始真正的生成Split信息了

第一次:将同DN上的所有block生成Split,生成方式:

1.循环nodeToBlocks,获得每个DN上有哪些block

2.循环这些block列表

3.将block从blockToNodes中移除,避免同一个block被包含在多个split中

4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中

5.向临时变量curSplitSize增加block的大小

6.判断curSplitSize是否已经超过了设置的maxSize

a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks

b) 没有超过,继续循环block列表,跳到第2步

7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)

a) 如果允许,执行并添加split信息

b) 如果不被允许,将这些剩余的block归还blockToNodes

8.重置

9.跳到步骤1

// process all nodes and create splits that are local
// to a node.
//创建同一个DN上的split 
for (Iterator<Map.Entry<String,
List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
iter.hasNext();) {
      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
      nodes.add(one.getKey());
      List<OneBlockInfo> blocksInNode = one.getValue();
      // for each block, copy it into validBlocks. Delete it from
      // blockToNodes so that the same block does not appear in
      // two different splits.
      for (OneBlockInfo oneblock : blocksInNode) {
            if (blockToNodes.containsKey(oneblock)) {
                  validBlocks.add(oneblock);
                  blockToNodes.remove(oneblock);
                  curSplitSize += oneblock.length;
                  // if the accumulated split size exceeds the maximum, then
                  // create this split.
                  if (maxSize != 0 && curSplitSize >= maxSize) {
                        // create an input split and add it to the splits array
                        //创建这些block合并后的split,并将其split添加到split列表中 
                        addCreatedSplit(job, splits, nodes, validBlocks);
                        //重置 
                        curSplitSize = 0;
                        validBlocks.clear();
                  }
            }
      }
      // if there were any blocks left over and their combined size is
      // larger than minSplitNode, then combine them into one split.
      // Otherwise add them back to the unprocessed pool. It is likely
      // that they will be combined with other blocks from the same rack later on.
      //其实这里的注释已经说的很清楚,我再按照我的理解��一下 
      /** 

          * 这里有几种情况: 

          * 1、在这个DN上还有没有被split的block, 

          * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值), 

          * 将把这些block合并成一个split 

          * 2、剩余的block的大小还是没有达到,将剩余的这些block 

          * 归还给blockToNodes,等以后统一处理 

          */
      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
            // create an input split and add it to the splits array
            addCreatedSplit(job, splits, nodes, validBlocks);
      } else {
      for (OneBlockInfo oneblock : validBlocks) {
            blockToNodes.put(oneblock, oneblock.hosts);
      }
}
validBlocks.clear();
nodes.clear();
curSplitSize = 0;
}


第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)

// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these overflow
// blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
ArrayList<String> racks = new ArrayList<String>();
// Process all racks over and over again until there is no more work to do.
//这里处理的就不再是同一个DN上的block 
//同一个DN上的已经被处理过了(上面的代码),这里是一些 
//还没有被处理的block 
while (blockToNodes.size() > 0) {
      // Create one split for this rack before moving over to the next rack.
      // Come back to this rack after creating a single split for each of the
      // remaining racks.
      // Process one rack location at a time, Combine all possible blocks that
      // reside on this rack as one split. (constrained by minimum and maximum
      // split size).
      // iterate over all racks
      //创建同机架的split 
      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
      rackToBlocks.entrySet().iterator(); iter.hasNext();) {
            Map.Entry<String, List<OneBlockInfo>> one = iter.next();
            racks.add(one.getKey());
            List<OneBlockInfo> blocks = one.getValue();
            // for each block, copy it into validBlocks. Delete it from
            // blockToNodes so that the same block does not appear in
            // two different splits.
            boolean createdSplit = false;
            for (OneBlockInfo oneblock : blocks) {
                  //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split 
                  if (blockToNodes.containsKey(oneblock)) {
                        validBlocks.add(oneblock);
                        blockToNodes.remove(oneblock);
                        curSplitSize += oneblock.length;
                        // if the accumulated split size exceeds the maximum, then
                        // create this split.
                        if (maxSize != 0 && curSplitSize >= maxSize) {
                              // create an input split and add it to the splits array
                              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
                              createdSplit = true;
                              break;
                        }
                  }
            }
            // if we created a split, then just go to the next rack
            if (createdSplit) {
                  curSplitSize = 0;
                  validBlocks.clear();
                  racks.clear();
                  continue;
            }
            //还有没有被split的block 
            //如果这些block的大小大于了同机架的最小split, 
            //则创建split 
            //否则,将这些block留到后面处理 
            if (!validBlocks.isEmpty()) {
                  if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
                        // if there is a mimimum size specified, then create a single split
                        // otherwise, store these blocks into overflow data structure
                        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
                  } else {
                  // There were a few blocks in this rack that remained to be processed.
                  // Keep them in 'overflow' block list. These will be combined later.
                  overflowBlocks.addAll(validBlocks);
            }
      }
      curSplitSize = 0;
      validBlocks.clear();
      racks.clear();
}
}


最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了

源码总结:

合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack

将可以合并的block写到同一个split中

使用自定义的CombineFileInputFormat

MultiFileCombineInputFormat
package org.rollinkin.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
/** 

    * 多文件合并split的输入format 

    *  

    * @author rollinkin 

    * @date 2012-10-29 

    * @version 1.0 

    * @since 1.0 

    */
public class MultiFileCombineInputFormat extends
CombineFileInputFormat<LongWritable, Text> {
      @Override
      public RecordReader<LongWritable, Text> getRecordReader(
      InputSplit split, JobConf job, Reporter reporter)
      throws IOException {
      @SuppressWarnings({ "rawtypes", "unchecked" })
            Class<RecordReader<LongWritable, Text>> rrClass = (Class)CombineLineRecordReader.class;
            return new CombineFileRecordReader<LongWritable, Text>(job,(CombineFileSplit) split, reporter,rrClass);
      }
}


CombineLineRecordReader,这个其实没有什么内容,就是包装了一个Reader

package org.rollinkin.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
public class CombineLineRecordReader implements
RecordReader<LongWritable, Text> {
      private LineRecordReader delegate;
      public CombineLineRecordReader(CombineFileSplit split, Configuration conf,
      Reporter reporter, Integer idx) throws IOException {
            FileSplit fileSplit = new FileSplit(split.getPath(idx),
            split.getOffset(idx), split.getLength(idx),
            split.getLocations());
            delegate = new LineRecordReader(conf, fileSplit);
      }
      @Override
      public boolean next(LongWritable key, Text value) throws IOException {
            return delegate.next(key, value);
      }
      @Override
      public LongWritable createKey() {
            return delegate.createKey();
      }
      @Override
      public Text createValue() {
            return delegate.createValue();
      }
      @Override
      public long getPos() throws IOException {
            return delegate.getPos();
      }
      @Override
      public void close() throws IOException {
            delegate.close();
      }
      @Override
      public float getProgress() throws IOException {
            return delegate.getProgress();
      }
}


具体的使用我就不再留了,其实很��单,就是把你的InputFormat设置成MultiFileCombineInputFormat 就可以了(在2012-11-09之前提供了一个reader实际上是不可用,他存在跨块读取的问题,

这里就不在提供了。如果使用了,请更新一下。哎,又传播错误的消息了)

现存问题

合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡

这里只实现了简单的Text的方式的合并,对于可压缩的、二进制等文件没有提供

这里提供的自定义的实现,只是简单的按行读取




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论