前言
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
需求
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
- 为了合并成一个文件,则需要在读取文件是,不按照FileInputFormat的规则来,即按照1个文件切一片的规则!
- 需要自定义输入格式来强制文件切片只能为1!这里切片规则就不展开来讲述了。
WholeFileInputformat
package com.zhengkw.inputformat;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/*** @ClassName:WholeFileInputformat* @author: zhengkw* @description: 自定义的inputformat 继承fileinputformat 输出到mapper 作为mapper的输入kv* @date: 20/02/25上午 11:36* @version:1.0* @since: jdk 1.8*/
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {
// FileSplit split = new FileSplit();WhoFileRecordReader recordReader = new WhoFileRecordReader();@Overridepublic RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
// split = (FileSplit) inputSplit;//调用初始化方法传入文件信息recordReader.initialize(inputSplit, context);return recordReader;}@Overrideprotected boolean isSplitable(JobContext context, Path filename) {
//保证文件只是一片return false;}}
WhoFileRecordReader
package com.zhengkw.inputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/*** @ClassName:WhoFileRecordReader* @author: zhengkw* @description: 读取一片切片!按照mapper需要的kv格式输出 inputformat来统一给mapper* @date: 20/02/25上午 11:41* @version:1.0* @since: jdk 1.8*/
public class WhoFileRecordReader extends RecordReader<Text, BytesWritable> {
FileSplit split = new FileSplit();Text k = new Text();BytesWritable v = new BytesWritable();private boolean isProgress = true; //切片执行标记private Configuration conf;/*** @param inputSplit by inputformat* @param context by inputformat* @descrption:* @return: void* @date: 20/02/25 上午 11:50* @author: zhengkw*/@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
//文件切片赋值split = (FileSplit) inputSplit;//初始化配置信息conf = new Configuration();
}/*** @descrption: 处理切片信息 给全局的K V 赋值* @return: boolean mapper run()* @date: 20/02/25 下午 12:06* @author: zhengkw*/@Overridepublic boolean nextKeyValue() throws IOException {
//通过init方法 获取了 inputsplit contentif (isProgress) {
//创建一个缓存空间,存储一个切片。切片是二进制文件,所以需要二进制数组存byte[] contents = new byte[(int) split.getLength()];//将切片数据放入contents I/O 获取文件系统FileSystem fileSystem=null;//当前切片文件路径创建一个文件系统对象fileSystem = new Path(split.getPath().toString()).getFileSystem(conf);//打开输入流对象FSDataInputStream fis = fileSystem.open(split.getPath());// 读取文件内容 IOUtiltry {
IOUtils.readFully(fis, contents, 0, (int) split.getLength());} catch (IOException e) {
e.printStackTrace();} finally {
IOUtils.closeStream(fis);}// 输出文件内容v.set(contents, 0, (int) split.getLength());// 设置输出的key值k.set(split.getPath().toString());isProgress = false;//为了让一个切片执行完后在run()退出循环//返回true是为了run执行map方法return true;}//false为了退出run里面的循环return false;}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {
return k;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {
return v;}@Overridepublic float getProgress() throws IOException, InterruptedException {
return 0;}@Overridepublic void close() throws IOException {
}
}
Mapper
package com.zhengkw.inputformat;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @ClassName:SequenceFileMapper* @author: zhengkw* @description:* @date: 20/02/25上午 11:34* @version:1.0* @since: jdk 1.8*/
public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);}
}
Reducer
package com.zhengkw.inputformat;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @ClassName:SequenceFileReducer* @author: zhengkw* @description:* @date: 20/02/25下午 12:24* @version:1.0* @since: jdk 1.8*/
public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for (BytesWritable value : values) {
context.write(key, value);}}
}
Driver
package com.zhengkw.inputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;/*** @ClassName:WholeFileDriver* @author: zhengkw* @description:* @date: 20/02/25下午 12:27* @version:1.0* @since: jdk 1.8*/
public class WholeFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path inpunt=new Path("F:/input");Path output=new Path("f:/output1");// 1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);FileSystem fileSystem = FileSystem.get(conf);if(fileSystem.exists(output)){
fileSystem.delete(output,true);}// 2 设置jar包存储位置、关联自定义的mapper和reducerjob.setJarByClass(WholeFileDriver.class);job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);// 7设置输入的inputFormatjob.setInputFormatClass(WholeFileInputformat.class);// 8设置输出的outputFormatjob.setOutputFormatClass(SequenceFileOutputFormat.class);// 3 设置map输出端的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);// 4 设置最终输出端的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);// 5 设置输入输出路径FileInputFormat.setInputPaths(job, inpunt);FileOutputFormat.setOutputPath(job, output);// 6 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}
总结
map阶段
执行map的run方法时,while循环调用nextKeyValue方法,该方法为RecordReader中的nextKeyValue方法,自定义的RecordReader方法被重写了!所以要跳出判断,必须在自定义的方法里返回一个false!每次RR读的是一个切片文件!