当前位置: 代码迷 >> 综合 >> idea wordcount
  详细解决方案

idea wordcount

热度:84   发布时间:2023-12-14 07:13:41.0

mapper

/*
KEYIN:默认是mr框架所读到的一行文本的起始偏移量,long
VALUEIN:默认是mr所读到的一行文本的内容
KEYOUT:是用户自定义逻辑处理完成后输出数据中的key string
VALUEOUT:是用户自定义逻辑处理完成后输出数据中的value,在此处是单词次数
 */
public class wordcountmapp extends Mapper<LongWritable, Text, Text, IntWritable> {
    //    maptask会对没一行输入数据调用一次map方法
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        将maptask传给我们的文本内容抓换成String,易操作
        String line = value.toString();
//        根据空格将这一行且分成单词
        String[] words = line.split(" ");
//将单词输出为《单词,1》
        for (String word : words) {
//            将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词,相同的单词分到相同的reduce

            context.write(new Text(word),new IntWritable(1));
        }
    }
}


reduce

/*
reducer输入类型要与map输出一致
KEYOUT是单词,VALUEOUT是单词次数
 */
public class wordcountredu extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//        <hello,1> <hello,1> <hello,1> <hello,1> <hello,1> <hello,1>
//       <banna,1><banna,1><banna,1><banna,1><banna,1><banna,1><banna,1>
//reduce 相同单词的运行一次,一组
        int count=0;
//        Iterator<IntWritable> iterator = values.iterator();
//        while (iterator.hasNext()){
//            count+=iterator.next().get();
//        }
        for (IntWritable value:values){
            count+=value.get();
        }
context.write(key,new IntWritable(count));
    }
}


driver


package com.qq.bd;

import com.qq.bd.wordcountDriver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class wordcountDriver {
    /*
    相当与一个yarn集群的客户端
    需要在此封装mr程序的相关参数,制定jar包,
    最后提交给yarn
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//        获取job实例传入参数配置
        Configuration conf = new Configuration();
//        设置采用yarn框架来运行程序
        conf.set("mapreduce.framework.name", "yarn");
//        设置yarn的主机名
        conf.set("yarn.resourcemanager.hostname", "mini01");
//        设置文件类型
        conf.set("fs.defaultFS", "hdfs://mini01:9000/");
        Job job = Job.getInstance(conf);
//指定mapper reduce class
        job.setMapperClass(wordcountmapp.class);
        job.setReducerClass(wordcountredu.class);

//        设置本程序jar所在路径
        job.setJarByClass(wordcountDriver.class);

//指定mapper输出的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//指定最终输出的key value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
//指定输入文件的路径,指的是在hdfs上的路径,用.lib.input
        FileInputFormat.setInputPaths(job, new Path(args[0]));
//    指定输出结果路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
//        将job相关参数,及jar路径提交给yarn运行
//        job.submit();

//        前者job.submit() 不知道程序运行情况,且不会推出
//        true表示把集群运行信息打印出来
        boolean res = job.waitForCompletion(true);
//        零表示退出
        System.exit(res ? 0 : 1);
//可以用shell脚本来运行,$?,获取程序退出码,为0成功 为1failed

    }
}


 

拷贝到集群中用  hadoop jar 运行因为没有打成可运行的jar包,

jar 包名 主类

hadoop jar wordcount.jar  com.ql.wordcount.class path1 path2



提交到yarn ,启动mrappmaster来调度协调map reduce

每台机器上通过inputformat 读取hdfs上的文件,

通过outputcollect,根据reduce产生对应的分区,

map任务执行完毕后,才会继续执行reduce任务,

同一种单词会传入一个reduce上


  相关解决方案