MapReduce的类型与格式

MapReduce的类型与格式

转载来源

MapReduce的类型

默认的MR作业

  1. 默认的mapper是Mapper类,它将输入的键和值原封不动地写到输出中
  2. 默认的partitioner是HashPartitioner,它对每条记录的键进行哈希操作以决定该记录应该属于哪个分区(每个分区对应于一个reduce任务)
  3. 默认的reducer是Reducer类,它将所有的输入写到输出中
  4. map任务的数量等于输入文件被划分成的块数
  5. reduce任务的个数的选择: 一个经验法则是目标reducer保持在每个运行5分钟左右且产生至少一个HDFS块的输出比较合适
  6. 默认的输入格式是TexInputFormat,输出是TextOutpFormat

输入格式

输入分片与记录

  • 一个输入分片就是由单个map操作来处理的数据块,并且每一个map只处理一个分片、
  • 每个输入分片分为若干个记录,每条记录就是 一个键值对,map将一个接一个地处理记录
  • 输入分片和记录都是逻辑概念,不一定对应着文件,也可能对应其他数据形式,如对于数据库,输入分片就是对应于一个表上的若干行,一条记录对应着其中的一行
  • 输入分片只是指向数据的引用,不包含数据本身

    1. InputSpilt接口(Java中的实现),包含
      • 以字节为单位的长度,表示分片的大小,用以排序分片,以便优先处理最大的分片,从而最小化作业运行时间
      • 一组存储位置,供MR系统使用一边将map任务尽可能放在分片数据附近
      • 该接口由InputFormat创建
    2. InputFormat
      • 运行作业的客户端使用getSplits方法计算分片,并将结果告知application master,后者使用其存储信息来调度map任务从而在集群集群上处理这些分片数据
      • map任务将输入分片传给createRecordReader方法来获取这个分片的RecordReader(就像是记录上的迭代器),map任务用这个RecordReader来生成记录的键值对,然后再将键值对传递给map函数(参见run方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/////InputFormat接口
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

/////Mapper的run方法
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()/*委托给RecorReader的同名方法,为mapper产生键值对*/) {
map(context.getCurrentKey(), context.getCurrentValue(), context);//从RecordReader中检索出并传递给map方法
}
cleanup(context);
}

文件输入–FileInputFormat

  1. FileInputFormat类提供两个功能:
    • 指出作业的输入文件位置
    • 实现了为输入文件产生输入分片的功能(把分片切割成记录的功能由其子类完成)
      此处输入图片的描述
  2. 输入路径
    public static void addInputPath(Job job, Path path)
    public static void addInputPaths(Job job, String commaSeparatedPaths)
    public static void setInputPaths(Job job, Path... inputPaths)
    public static void setInputPaths(Job job, String commaSeparatedPaths)

    • 前两者用于加入一个或多个路径到路径列表中,后两者一次设定完整的路径列表(replacing any paths set on the Job in previous calls)
    • 一条路径可以是文件、目录或者glob(文件和目录的结合),但是目录在默认情况下不会进行递归处理,如果目录下存在子目录,则要么采用glob的形式,要么设置过滤器过滤子目录(因为子目录会被当作文件而报错),或者更改属性设置让其可以递归处理
    • FileInputFormat有默认过滤器,用以过滤隐藏文件(自定义的过滤器会和这个默认的一起工作)
  1. 输入分片: FileInputFormat只分割大文件(超过块的大小)
    • 分片计算公式 max(minimumSize, min(maximumSize, blockSize))
    • 默认情况 minimumSize < blockSize < maximumSize
  2. 小文件与CombineFileInputFormat

    • CombineFileInputFormat类可以把多个文件打包到一个分片中(在决定将哪些块放到同一分片时,会考虑节点和机架的因素)
  3. 避免切分
    • 设置最小分片大小以避免切分、
    • 重写isSplitable方法
  4. mapper中文件信息
    • 调用Mapper类中Context对象的getInputSplit方法来获得InputSplit,对于FileInputFormat,它会被转成FileSplit
    • 注意此处的getInputSplit方法和InputFormat中的getSplit方法,后者是用于为整个输入计算分片,而前者是为某个mapper获取该输入分片的相关信息
      此处输入图片的描述
  5. 把整个文件作为一条记录处理

文本输入–TextInputFormat

Hadoop非常擅长处理非结构化文本数据

  1. TextInputFormat是默认的InputFormat

    • 每条记录是一条输入,键是LongWritable,存储该行在整个文件中的字节偏移量,值是该行的内容(不包括任何行终止符)
    • 由于此处的逻辑记录是以行为单位的,因而可能出现某一行会跨文件块存放,从未会为‘本地化’的map任务带来远程读操作的开销(这是因为分片是和行对齐的而不是hdfs块,参考图示)
    • 此处输入图片的描述
  2. 控制一行最大的长度

    • 目的是应对损坏的文件,文件的损坏可能对应一个超长行,从而导致内存溢出
    • 长度通过属性mapreduce.input.linerecordreader.line.maxlength设置
  3. 关于KeyValueTextInputFFormat

    • 目的是应对那些每行内容是一个键值对的文件(之所以是键值对,是因为它经过了一些操作,比如TextOutputFormat的输出就会将键值对写入文件,两者之间使用分隔符分开)
    • 所以使用时要指定键值对之间的分隔符,默认是制表符(属性mapreduce.input.keyvaluelinere cordreader.key.value.separator),且保持原来的键而不是使用偏移量作为键
  4. 关于NLineInputFormat

    • 一般每个mapper收到的行数不同(行数取决于分片大小和行长度),通过该类可是使每个mapper收到的行数相同
    • 键是文件中行的字节偏移量,值是行本身
    • 应用场景
      • 仿真
      • 用Hadoop引导从多个数据源(如数据库)加载数据,每行一个数据源
  5. 关于xml
    • StreamXmlReccordReader
Have a nice day!