MapReduce设计模式:Chaining
?
Chaining这种设计模式非常重要,主要是因为你通常无法通过单个MapReduce Job来完成工作;某些Job必须串行,因为前者Job的输出会成为下个Job的输入;某些Job可以并行,因为Job运行之间没有关系;有些Job的Mapper是对日志的重复处理,需要将代码规范化。不管怎么说,不能靠单个MapReduce程序就完成工作是我们的挑战。
?
我们这节来讨论下Chaining模式,由于Hadoop的设计是为了MapReduce程序的高速运行,单个Job完全没有问题,但是这个并不能解决工作中的问题。在 工作中,通常任务根本无法通过单个Job完成,除非是像WordCount这样的任务。也就是说对于多阶段的MapReduce程序控制上需要我们做更多的编码,对Job进行控制。如果你感觉任务过于复杂,控制难于从心时,看下这个Oozie这个Apache Project,这个Oozie我也没有细看,能够通过配置对Job进行更为细致的控制。我们本节的任务不是讨论MapReduce框架处理Job的复杂性,不过对于Job执行的优劣必须知道。
?
下面我们讨论Chaining设计模式的实现。
首先,回忆下我们讨论过的TOP-K问题,会怎么实现呢?
?
首先我们先对输出的数据进行排序,算出总排序,这个需要一个Job;然后再对这个Job的输出作为下一个Job的输入,计算TOP-K数据,代码不再单独列出,思想理解就行。
这个计算方法就利用了一种很重要的Chaining方法,Serial Chaining。这个是指将MapReduce Job逐个串起来,按照顺序执行。用Unix-Style方式表示就是:
mapreduce-1 | mapreduce-2 | mapreduce-3 | ...
?这个是最直观的Mapreduce Job的编写方式。通过这种控制,我们能完成工作,但是问题在于:这样写出来的代码如果Job比较多的话,控制各个mapreduce的执行状态会让代码特别丑;如果只有少数几个的话,还是可以接受的,如果Job串行过多,这种方法要慎用。
其实Hadoop的Mapreduce支持依赖方式的编码。如果有这么个场景:mapreducea处理来自数据库的数据集合,mapreduceb处理来自Hbase的数据集合,mapreducec则对这两个集合进行Join操作,Join操作的内容参考http://isilic.iteye.com/blog/1772945。这样这三个mapreduce就有了依赖关系。mapreducea和mapreduceb两者可以并行执行,mapreducec则依赖于前两者的执行结果。这个用上面提到的Serial Chaining也不好解决。
实际上,Hadoop实现的mapreduce程序已经帮助我们完成相关的控制;在Job中,可以设置依赖的Job,直接提交最终的Job,MapReduce会控制依赖关系的执行:
mapreducec.addDependingJob(mapreducea);mapreducec.addDependingJob(mapreduceb);submit(mapreducea,mapreduceb,mapreducec);
?
注意这个并不是真正的mapreduce依赖,是Job的依赖,这里只是点明三者的关系。
使用这种方法,需要对Job的执行状态进行监控,确保三者都执行完毕后,主程序再退出。
?
上面两种方法都是对于Job依赖的情况,前者是自己控制,后者定义好依赖关系,提交给系统控制。不过现在又这么一种情况,在Document Information Retrieval中,需要对文档的Stop word进行处理,还需要对Stemming进行继续的处理,后续再进行lemmatization处理,处理完成后作为Term进行后续的正常处理,如词频统计,分词等。对于Document的一系列操作难道要写多个Mapreduce吗?不用,Hadoop提供了ChainMapper和ChainReducer来处理这种情况。我们来详细学习下ChainMapper和ChainReducer这两个方法。
?
ChainMapper:在单个map节点执行多个mapper逻辑,这个类似于Unix的管道操作,管道处理记录;其中前者的处理输出作为下一个的mapper的输入,mapper chain执行完毕后,才会进入到partitioner阶段,后面会继续进入到Reducer阶段。
?
ChainReducer:难道你会认为ChainReducer的功能和ChainMapper类似吗?No,完全不是。ChainReducer中可以设置单个Reducer和多个Mapper,执行完Reducer之后会再顺序执行Mapper的逻辑(吐槽下这个ChainReducer的命名),跟Reducer Chain完全没有关系。
?
按照官网的说法,ChainMapper的优点是更好的利用IO,在大多数情况下,单个Mapper也能完成ChainMapper的内容,不过这样的设计更好的是软件工程的功能化、内聚化的体现。ChainReducer能偶更好地利用reducer节点,方便实现reducer输出后的进一步数据逻辑处理,实际上就是ChainReducer中的Mapper处理逻辑。
本文开始的mapreduce程序可以逻辑表述为:
?
[ map | reduce]+
?
ChainMapper和ChainReducer用逻辑关系式可以表示为:
[ map* | reducer mapper* ]
?这个逻辑表达式一定要理解数据流的走向,最好能结合代码看下。
?
将上面的内容结合起来,就可以得到本节的核心内容,表示为:
[ map* | reducer mapper* ]+
这个关系式如果能理解,再结合自己工作中的逻辑,就算是深入的理解本节的内容了。
?
最后贴个wordcount的代码,根据本节的内容做了个更改,逻辑性方面就不要再追究了,仅供演示使用。
代码不做解释,大家就是需要理解一下这个过程:
?
public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map ( Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable> { public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException { String k = key.toString(); context.write(new Text(k.toUpperCase()), value); } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce ( Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class ThresholdFilterMapper extends Mapper<Text, IntWritable, Text, IntWritable> { public void map ( Text key, IntWritable value, Context context ) throws IOException, InterruptedException { int cnt = value.get(); if (cnt > 10) context.write(key, value); } } public static void main ( String[] args ) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } JobConf jc = new JobConf("JobControl"); Job job = new Job(jc); Configuration tokenConf = new Configuration(false); ChainMapper.addMapper(job, TokenizerMapper.class, Object.class, Text.class, Text.class, IntWritable.class, tokenConf); Configuration upperConf = new Configuration(false); ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperConf); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); Configuration reducerConf = new Configuration(false); ChainReducer.setReducer(job, IntSumReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, reducerConf); Configuration thresholdConf = new Configuration(false); ChainReducer.addMapper(job, ThresholdFilterMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, thresholdConf); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
?