前言
实现一个手写的WC程序,并打包在集群上运行。
创建一个Maven工程,导入pom
工程目录
导入pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhengkw</groupId><artifactId>wordcount</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency><!-- <dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency>--></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><!--打包工具,利用maven共计打包成jar --><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.atguigu.mapreduce.WordcountDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
Mapper
package com.mr.hdfsmapredurce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @ClassName:WordcountMapper* @author: zhengkw* @description: mapper* @date: 20/02/24上午 8:42* @version:1.0* @since: jdk 1.8*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();IntWritable v = new IntWritable(1);/*** @param key* @param value* @param context* @descrption:重写map 实现wordcount* @return: void* @date: 20/02/24 上午 8:47* @author: zhengkw*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行String line = value.toString().trim();// 2 切割String[] words = line.split(" ");// 3 输出for (String word : words) {
k.set(word);//期望输出的是<hadoop,1> --> <string,int>context.write(k, v);}}
}
Reduce
package com.mr.hdfsmapredurce;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @ClassName:WordcountRedurce* @author: zhengkw* @description: redurce* @date: 20/02/24上午 8:42* @version:1.0* @since: jdk 1.8*/
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;IntWritable v = new IntWritable();/*** @param key* @param values* @param context* @descrption:* @return: void* @date: 20/02/24 上午 8:51* @author: zhengkw*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 累加求和for (IntWritable value : values) {
sum += value.get();}// 2 输出v.set(sum);context.write(key, v);}}
Driver
package com.mr.hdfsmapredurce;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @descrption:* @return:* @date: 20/02/24 上午 8:53* @author: zhengkw*/
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取配置信息以及封装任务Configuration configuration = new Configuration();//用配置文件反射实例化job对象Job job = Job.getInstance(configuration);// 2 设置jar加载路径job.setJarByClass(WordcountDriver.class);// 3 设置map和reduce类job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReduce.class);// 4 设置map输出job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
注意!!!
参数是由main中的args[]接收!,测试的时候需要在运行的时候传入参数,参数为pathIN,pathOUT!!
打包在集群上运行
打包后生产2种jar
选择jar
选择jar取决于jar运行的环境中是否有依赖,在集群上执行,集群上有各种依赖,所以可以选择不带依赖的jar!
上传jar到集群并且运行
运行手写的WCjar需要用bin/hadoop jar 后面跟上驱动类的全类名!!!!
准备数据
测试
运行手写的WCjar需要用bin/hadoop jar 后面跟上驱动类的全类名!!!!
运行手写的WCjar需要用bin/hadoop jar 后面跟上驱动类的全类名!!!!
运行手写的WCjar需要用bin/hadoop jar 后面跟上驱动类的全类名!!!!
运行成功
检验
查看结果
总结
- 打包jar在集群上运行可以不选用含依赖的jar包。
- 运行jar包时 hadoop jar 后面跟上自己的驱动类全类名!!很重要,否则会报错,找不到类!
- 程序的参数必须由args数组传递,不能写死!