mapper
/* KEYIN:默认是mr框架所读到的一行文本的起始偏移量,long VALUEIN:默认是mr所读到的一行文本的内容 KEYOUT:是用户自定义逻辑处理完成后输出数据中的key string VALUEOUT:是用户自定义逻辑处理完成后输出数据中的value,在此处是单词次数 */ public class wordcountmapp extends Mapper<LongWritable, Text, Text, IntWritable> { // maptask会对没一行输入数据调用一次map方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将maptask传给我们的文本内容抓换成String,易操作 String line = value.toString(); // 根据空格将这一行且分成单词 String[] words = line.split(" "); //将单词输出为《单词,1》 for (String word : words) { // 将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词,相同的单词分到相同的reduce context.write(new Text(word),new IntWritable(1)); } } }
reduce
/* reducer输入类型要与map输出一致 KEYOUT是单词,VALUEOUT是单词次数 */ public class wordcountredu extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // <hello,1> <hello,1> <hello,1> <hello,1> <hello,1> <hello,1> // <banna,1><banna,1><banna,1><banna,1><banna,1><banna,1><banna,1> //reduce 相同单词的运行一次,一组 int count=0; // Iterator<IntWritable> iterator = values.iterator(); // while (iterator.hasNext()){ // count+=iterator.next().get(); // } for (IntWritable value:values){ count+=value.get(); } context.write(key,new IntWritable(count)); } }
driver
package com.qq.bd; import com.qq.bd.wordcountDriver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class wordcountDriver { /* 相当与一个yarn集群的客户端 需要在此封装mr程序的相关参数,制定jar包, 最后提交给yarn */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 获取job实例传入参数配置 Configuration conf = new Configuration(); // 设置采用yarn框架来运行程序 conf.set("mapreduce.framework.name", "yarn"); // 设置yarn的主机名 conf.set("yarn.resourcemanager.hostname", "mini01"); // 设置文件类型 conf.set("fs.defaultFS", "hdfs://mini01:9000/"); Job job = Job.getInstance(conf); //指定mapper reduce class job.setMapperClass(wordcountmapp.class); job.setReducerClass(wordcountredu.class); // 设置本程序jar所在路径 job.setJarByClass(wordcountDriver.class); //指定mapper输出的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最终输出的key value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定输入文件的路径,指的是在hdfs上的路径,用.lib.input FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定输出结果路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 将job相关参数,及jar路径提交给yarn运行 // job.submit(); // 前者job.submit() 不知道程序运行情况,且不会推出 // true表示把集群运行信息打印出来 boolean res = job.waitForCompletion(true); // 零表示退出 System.exit(res ? 0 : 1); //可以用shell脚本来运行,$?,获取程序退出码,为0成功 为1failed } }
拷贝到集群中用 hadoop jar 运行因为没有打成可运行的jar包,
jar 包名 主类
hadoop jar wordcount.jar com.ql.wordcount.class path1 path2
提交到yarn ,启动mrappmaster来调度协调map reduce
每台机器上通过inputformat 读取hdfs上的文件,
通过outputcollect,根据reduce产生对应的分区,
map任务执行完毕后,才会继续执行reduce任务,
同一种单词会传入一个reduce上