过程分析
统计单词,把数据中的单词分别统计出出现的次数
过程图(图片源自网络):
实现Mapper、Reducer、Driver
WordCountMapper :
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Text k = new Text();private IntWritable v = new IntWritable(1);/*** 重写map方法* @param key 行号* @param value 行数据* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一行的数据String valueString = value.toString();// 分割一行的数据String[] strings = valueString.split(" ");// 输出K-V对for (String string : strings) {
k.set(string);context.write(k,v);}}
}
WordCountReduce :
public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable v = new IntWritable(0);/*** reduce合并过程* @param key key值* @param values 同一个key的value值得列表* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 统计数字int count = 0;// 汇总数字for (IntWritable value : values) {
count += value.get();}// 赋值v.set(count);// 输出context.write(key,v);}
}
WordCountDriver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job对象System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(configuration);//configuration.set("mapreduce.framework.name","local");//configuration.set("fs.defaultFS","file:///");Job job = Job.getInstance(configuration);// 设置加载类job.setJarByClass(WordCountDriver.class);// 设置map和reduce类job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReduce.class);// 设置mapper输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 设置最终输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置输入文件和输出文件FileInputFormat.setInputPaths(job,new Path("E:\\hdfs\\input\\word.txt"));Path outPath = new Path("E:\\hdfs\\output");if (fs.exists(outPath)) {
fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, new Path("E:\\hdfs\\output"));boolean waitForCompletion = job.waitForCompletion(true);System.out.println(waitForCompletion);System.exit(waitForCompletion?0:1);}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xing</groupId><artifactId>MapReduce</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.1</version></dependency></dependencies></project>
我用的本地windows开发环境,如果不知道怎么搭建本地开发环境可以看我这篇
【Spark】Windows运行本地spark程序——JAVA版本
搭建过程都是一样的。
结果: