当前位置: 代码迷 >> Eclipse >> 使用eclipse回调试hadoop作业是非常简洁方便的
  详细解决方案

使用eclipse回调试hadoop作业是非常简洁方便的

热度:426   发布时间:2016-04-22 23:36:57.0
使用eclipse来调试hadoop作业是非常简洁方便的,

使用eclipse来调试hadoop作业是非常简洁方便的,散仙以前也有用eclipse开发过hadoop程序,但是一直没有深入了解eclipse调试的一些模式,有些时候也会出一些莫名奇妙的异常,最常见的就是下面这个

Java代码 复制代码?收藏代码
  1. java.lang.RuntimeException:?java.lang.ClassNotFoundException:?com.qin.sort.TestSort$SMapper??
  2. ????at?org.apache.hadoop.conf.Configuration.getClass(Configuration.java:857)??
  3. ????at?org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)??
  4. ????at?org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:718)??
  5. ????at?org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)??
  6. ????at?org.apache.hadoop.mapred.Child$4.run(Child.java:255)??
  7. ????at?java.security.AccessController.doPrivileged(Native?Method)??
  8. ????at?javax.security.auth.Subject.doAs(Subject.java:415)??
  9. ????at?org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)??
java.lang.RuntimeException: java.lang.ClassNotFoundException: com.qin.sort.TestSort$SMapper	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:857)	at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:718)	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)	at java.security.AccessController.doPrivileged(Native Method)	at javax.security.auth.Subject.doAs(Subject.java:415)	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)


这个异常是最莫名其妙的一个,明明自己的MR类里面有这个Mapper的内部类,但是一运行程序,就报这个异常,说找不到这个类,然后就百般查找问题,找来找去,也没找出个所以然。

其实这并不是程序的问题,而是对eclipse的调试模式不够了解的问题,eclipse上运行hadoop总的来说有2种模式,第一种就是Local模式,也叫本地模式,第二种就是我们正式的线上集群模式,当运行本地模式的时候,程序并不会被提交到Hadoop集群上,而是基于单机的模式跑的,但是单机的模式,运行的结果仍在是存储在HDFS上的,只不过没有利用hadoop集群的资源,单机的模式不要提交jar包到hadoop集群上,因此一般我们使用local来测试我们的MR程序是否能够正常运行,
下面我们来看下,基于Local模式跑的一个排序作业:

Java代码 复制代码?收藏代码
  1. 排序数据:??
  2. a?784??
  3. b?12??
  4. c?-11??
  5. dd?99999??
排序数据:a 784b 12c -11dd 99999


程序源码:

Java代码 复制代码?收藏代码
  1. package?com.qin.sort;??
  2. ??
  3. import?java.io.IOException;??
  4. ??
  5. import?org.apache.hadoop.conf.Configuration;??
  6. import?org.apache.hadoop.fs.FileSystem;??
  7. import?org.apache.hadoop.fs.Path;??
  8. import?org.apache.hadoop.io.IntWritable;??
  9. import?org.apache.hadoop.io.LongWritable;??
  10. import?org.apache.hadoop.io.Text;??
  11. import?org.apache.hadoop.io.WritableComparator;??
  12. import?org.apache.hadoop.mapred.JobConf;??
  13. import?org.apache.hadoop.mapreduce.Job;??
  14. import?org.apache.hadoop.mapreduce.Mapper;??
  15. import?org.apache.hadoop.mapreduce.Reducer;??
  16. import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;??
  17. import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;??
  18. ??
  19. ??
  20. ???
  21. /**?
  22. ?*?测试排序的?
  23. ?*?MR作业类?
  24. ?*??
  25. ?*?QQ技术交流群:324714439?
  26. ?*?@author?qindongliang?
  27. ?*??
  28. ?*??
  29. ?*?**/??
  30. public?class?TestSort?{??
  31. ??????
  32. ??????
  33. ????/**?
  34. ?????*?Map类?
  35. ?????*??
  36. ?????*?**/??
  37. ????private?static?class?SMapper?extends?Mapper<LongWritable,?Text,?IntWritable,?Text>{??
  38. ??????????
  39. ????????private?Text?text=new?Text();//输出??
  40. ?????????private?static?final?IntWritable?one=new?IntWritable();??
  41. ??????????
  42. ????????@Override??
  43. ????????protected?void?map(LongWritable?key,?Text?value,Context?context)??
  44. ????????????????throws?IOException,?InterruptedException?{??
  45. ????????????String?s=value.toString();??
  46. ????????????//System.out.println("abc:?"+s);??
  47. ????????//??if((s.trim().indexOf("?")!=-1)){??
  48. ?????????????String?ss[]=s.split("?");??
  49. ?????????????one.set(Integer.parseInt(ss[1].trim()));//??
  50. ?????????????text.set(ss[0].trim());????
  51. ?????????????context.write(one,?text);??
  52. ????????}??
  53. ????}??
  54. ??????
  55. ????/**?
  56. ?????*?Reduce类?
  57. ?????*?
  58. ?????*?*/??
  59. ?????private?static?class?SReduce?extends?Reducer<IntWritable,?Text,?Text,?IntWritable>{??
  60. ?????????private?Text?text=new?Text();??
  61. ?????????@Override??
  62. ????????protected?void?reduce(IntWritable?arg0,?Iterable<Text>?arg1,Context?context)??
  63. ????????????????throws?IOException,?InterruptedException?{??
  64. ???????????????
  65. ???????????????
  66. ?????????????for(Text?t:arg1){??
  67. ?????????????????text.set(t.toString());??
  68. ??????????????????
  69. ?????????????????context.write(text,?arg0);??
  70. ?????????????}???
  71. ????????}??
  72. ?????}??
  73. ???????
  74. ?????/**?
  75. ??????*?排序的类?
  76. ??????*??
  77. ??????*?**/??
  78. ?????private?static?class?SSort?extends?WritableComparator{??
  79. ???????????
  80. ?????????public?SSort()?{??
  81. ?????????????super(IntWritable.class,true);//注册排序组件??
  82. ????????}??
  83. ?????????@Override??
  84. ????????public?int?compare(byte[]?arg0,?int?arg1,?int?arg2,?byte[]?arg3,??
  85. ????????????????int?arg4,?int?arg5)?{??
  86. ????????????/**?
  87. ?????????????*?控制升降序的关键控制-号是降序?
  88. ?????????????*?*/??
  89. ????????????return?-super.compare(arg0,?arg1,?arg2,?arg3,?arg4,?arg5);//注意使用负号来完成降序??
  90. ????????}??
  91. ???????????
  92. ?????????@Override??
  93. ????????public?int?compare(Object?a,?Object?b)?{??
  94. ???????
  95. ????????????return????-super.compare(a,?b);//注意使用负号来完成降序??
  96. ????????}??
  97. ???????????
  98. ???????????
  99. ?????}??
  100. ??????
  101. ?????/**?
  102. ??????*?main方法?
  103. ??????*?*/??
  104. ?????public?static?void?main(String[]?args)?throws?Exception{??
  105. ?????????String?inputPath="hdfs://192.168.75.130:9000/root/output";???????
  106. ??????????String?outputPath="hdfs://192.168.75.130:9000/root/outputsort";??
  107. ??????????JobConf?conf=new?JobConf();??
  108. ????????//Configuration?conf=new?Configuration();??
  109. ???????????//在你的文件地址前自动添加:hdfs://master:9000/??
  110. ?????????//?conf.set("fs.default.name",?"hdfs://192.168.75.130:9000");??
  111. ??????????//指定jobtracker的ip和端口号,master在/etc/hosts中可以配置??
  112. ????????//??conf.set("mapred.job.tracker","192.168.75.130:9001");??
  113. ?????????//?conf.get("mapred.job.tracker");??
  114. ?????????System.out.println("模式:??"+conf.get("mapred.job.tracker"));??
  115. ????????//??conf.setJar("tt.jar");??
  116. ??????????FileSystem??fs=FileSystem.get(conf);??
  117. ??????????Path?pout=new?Path(outputPath);??
  118. ??????????if(fs.exists(pout)){??
  119. ??????????????fs.delete(pout,?true);??
  120. ??????????????System.out.println("存在此路径,?已经删除......");??
  121. ??????????}?????????????
  122. ??????????Job?job=new?Job(conf,?"sort123");???
  123. ??????????job.setJarByClass(TestSort.class);??
  124. ??????????job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型??
  125. ??????????FileInputFormat.setInputPaths(job,?new?Path(inputPath));??//输入路径??
  126. ??????????FileOutputFormat.setOutputPath(job,?new?Path(outputPath));//输出路径????
  127. ??????????job.setMapperClass(SMapper.class);//map类??
  128. ??????????job.setReducerClass(SReduce.class);//reduce类??
  129. ??????????job.setSortComparatorClass(SSort.class);//排序类??
  130. //??????????job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);??
  131. //????????job.setOutputFormatClass(TextOutputFormat.class);??
  132. ??????????System.exit(job.waitForCompletion(true)???0?:?1);????
  133. ???????????
  134. ???????????
  135. ????}??
  136. ??????
  137. ??
  138. }??
package com.qin.sort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 测试排序的 * MR作业类 *  * QQ技术交流群:324714439 * @author qindongliang *  *  * **/public class TestSort {			/**	 * Map类	 * 	 * **/	private static class SMapper extends Mapper<LongWritable, Text, IntWritable, Text>{				private Text text=new Text();//输出		 private static final IntWritable one=new IntWritable();				@Override		protected void map(LongWritable key, Text value,Context context)				throws IOException, InterruptedException {			String s=value.toString();		 	//System.out.println("abc: "+s);		//	if((s.trim().indexOf(" ")!=-1)){			 String ss[]=s.split(" "); 			 one.set(Integer.parseInt(ss[1].trim()));// 			 text.set(ss[0].trim());   			 context.write(one, text);		}	}		/**	 * Reduce类	 *	 * */	 private static class SReduce extends Reducer<IntWritable, Text, Text, IntWritable>{		 private Text text=new Text();		 @Override		protected void reduce(IntWritable arg0, Iterable<Text> arg1,Context context)				throws IOException, InterruptedException {			 			 			 for(Text t:arg1){				 text.set(t.toString());								 context.write(text, arg0);			 } 		}	 }	 	 /**	  * 排序的类	  * 	  * **/	 private static class SSort extends WritableComparator{		 		 public SSort() {			 super(IntWritable.class,true);//注册排序组件		}		 @Override		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,				int arg4, int arg5) {			/**			 * 控制升降序的关键控制-号是降序			 * */			return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序		}		 		 @Override		public int compare(Object a, Object b) {	 			return    -super.compare(a, b);//注意使用负号来完成降序		}		 		 	 }		 /**	  * main方法	  * */	 public static void main(String[] args) throws Exception{		 String inputPath="hdfs://192.168.75.130:9000/root/output";	    		  String outputPath="hdfs://192.168.75.130:9000/root/outputsort";		  JobConf conf=new JobConf();		//Configuration conf=new Configuration();		   //在你的文件地址前自动添加:hdfs://master:9000/		 // conf.set("fs.default.name", "hdfs://192.168.75.130:9000");		  //指定jobtracker的ip和端口号,master在/etc/hosts中可以配置		//  conf.set("mapred.job.tracker","192.168.75.130:9001");		 // conf.get("mapred.job.tracker");		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));		//  conf.setJar("tt.jar");		  FileSystem  fs=FileSystem.get(conf);		  Path pout=new Path(outputPath);		  if(fs.exists(pout)){			  fs.delete(pout, true);			  System.out.println("存在此路径, 已经删除......");		  } 		  		  Job job=new Job(conf, "sort123");           job.setJarByClass(TestSort.class);          job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型          FileInputFormat.setInputPaths(job, new Path(inputPath));  //输入路径          FileOutputFormat.setOutputPath(job, new Path(outputPath));//输出路径            job.setMapperClass(SMapper.class);//map类          job.setReducerClass(SReduce.class);//reduce类          job.setSortComparatorClass(SSort.class);//排序类//          job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);//		  job.setOutputFormatClass(TextOutputFormat.class);          System.exit(job.waitForCompletion(true) ? 0 : 1);  		 		 	}	}


打印结果如下:

Java代码 复制代码?收藏代码
  1. 模式:??local??
  2. 存在此路径,?已经删除......??
  3. WARN?-?NativeCodeLoader.<clinit>(52)?|?Unable?to?load?native-hadoop?library?for?your?platform...?using?builtin-java?classes?where?applicable??
  4. WARN?-?JobClient.copyAndConfigureFiles(746)?|?Use?GenericOptionsParser?for?parsing?the?arguments.?Applications?should?implement?Tool?for?the?same.??
  5. WARN?-?JobClient.copyAndConfigureFiles(870)?|?No?job?jar?file?set.??User?classes?may?not?be?found.?See?JobConf(Class)?or?JobConf#setJar(String).??
  6. INFO?-?FileInputFormat.listStatus(237)?|?Total?input?paths?to?process?:?1??
  7. WARN?-?LoadSnappy.<clinit>(46)?|?Snappy?native?library?not?loaded??
  8. INFO?-?JobClient.monitorAndPrintJob(1380)?|?Running?job:?job_local1242054158_0001??
  9. INFO?-?LocalJobRunner$Job.run(340)?|?Waiting?for?map?tasks??
  10. INFO?-?LocalJobRunner$Job$MapTaskRunnable.run(204)?|?Starting?task:?attempt_local1242054158_0001_m_000000_0??
  11. INFO?-?Task.initialize(534)?|??Using?ResourceCalculatorPlugin?:?null??
  12. INFO?-?MapTask.runNewMapper(729)?|?Processing?split:?hdfs://192.168.75.130:9000/root/output/sort.txt:0+28??
  13. INFO?-?MapTask$MapOutputBuffer.<init>(949)?|?io.sort.mb?=?100??
  14. INFO?-?MapTask$MapOutputBuffer.<init>(961)?|?data?buffer?=?79691776/99614720??
  15. INFO?-?MapTask$MapOutputBuffer.<init>(962)?|?record?buffer?=?262144/327680??
  16. INFO?-?MapTask$MapOutputBuffer.flush(1289)?|?Starting?flush?of?map?output??
  17. INFO?-?MapTask$MapOutputBuffer.sortAndSpill(1471)?|?Finished?spill?0??
  18. INFO?-?Task.done(858)?|?Task:attempt_local1242054158_0001_m_000000_0?is?done.?And?is?in?the?process?of?commiting??
  19. INFO?-?LocalJobRunner$Job.statusUpdate(466)?|???
  20. INFO?-?Task.sendDone(970)?|?Task?'attempt_local1242054158_0001_m_000000_0'?done.??
  21. INFO?-?LocalJobRunner$Job$MapTaskRunnable.run(229)?|?Finishing?task:?attempt_local1242054158_0001_m_000000_0??
  22. INFO?-?LocalJobRunner$Job.run(348)?|?Map?task?executor?complete.??
  23. INFO?-?Task.initialize(534)?|??Using?ResourceCalculatorPlugin?:?null??
  24. INFO?-?LocalJobRunner$Job.statusUpdate(466)?|???
  25. INFO?-?Merger$MergeQueue.merge(408)?|?Merging?1?sorted?segments??
  26. INFO?-?Merger$MergeQueue.merge(491)?|?Down?to?the?last?merge-pass,?with?1?segments?left?of?total?size:?35?bytes??
  27. INFO?-?LocalJobRunner$Job.statusUpdate(466)?|???
  28. INFO?-?Task.done(858)?|?Task:attempt_local1242054158_0001_r_000000_0?is?done.?And?is?in?the?process?of?commiting??
  29. INFO?-?LocalJobRunner$Job.statusUpdate(466)?|???
  30. INFO?-?Task.commit(1011)?|?Task?attempt_local1242054158_0001_r_000000_0?is?allowed?to?commit?now??
  31. INFO?-?FileOutputCommitter.commitTask(173)?|?Saved?output?of?task?'attempt_local1242054158_0001_r_000000_0'?to?hdfs://192.168.75.130:9000/root/outputsort??
  32. INFO?-?LocalJobRunner$Job.statusUpdate(466)?|?reduce?>?reduce??
  33. INFO?-?Task.sendDone(970)?|?Task?'attempt_local1242054158_0001_r_000000_0'?done.??
  34. INFO?-?JobClient.monitorAndPrintJob(1393)?|??map?100%?reduce?100%??
  35. INFO?-?JobClient.monitorAndPrintJob(1448)?|?Job?complete:?job_local1242054158_0001??
  36. INFO?-?Counters.log(585)?|?Counters:?19??
  37. INFO?-?Counters.log(587)?|???File?Output?Format?Counters???
  38. INFO?-?Counters.log(589)?|?????Bytes?Written=26??
  39. INFO?-?Counters.log(587)?|???File?Input?Format?Counters???
  40. INFO?-?Counters.log(589)?|?????Bytes?Read=28??
  41. INFO?-?Counters.log(587)?|???FileSystemCounters??
  42. INFO?-?Counters.log(589)?|?????FILE_BYTES_READ=393??
  43. INFO?-?Counters.log(589)?|?????HDFS_BYTES_READ=56??
  44. INFO?-?Counters.log(589)?|?????FILE_BYTES_WRITTEN=135742??
  45. INFO?-?Counters.log(589)?|?????HDFS_BYTES_WRITTEN=26??
  46. INFO?-?Counters.log(587)?|???Map-Reduce?Framework??
  47. INFO?-?Counters.log(589)?|?????Map?output?materialized?bytes=39??
  48. INFO?-?Counters.log(589)?|?????Map?input?records=4??
  49. INFO?-?Counters.log(589)?|?????Reduce?shuffle?bytes=0??
  50. INFO?-?Counters.log(589)?|?????Spilled?Records=8??
  51. INFO?-?Counters.log(589)?|?????Map?output?bytes=25??
  52. INFO?-?Counters.log(589)?|?????Total?committed?heap?usage?(bytes)=455475200??
  53. INFO?-?Counters.log(589)?|?????Combine?input?records=0??
  54. INFO?-?Counters.log(589)?|?????SPLIT_RAW_BYTES=112??
  55. INFO?-?Counters.log(589)?|?????Reduce?input?records=4??
  56. INFO?-?Counters.log(589)?|?????Reduce?input?groups=4??
  57. INFO?-?Counters.log(589)?|?????Combine?output?records=0??
  58. INFO?-?Counters.log(589)?|?????Reduce?output?records=4??
  59. INFO?-?Counters.log(589)?|?????Map?output?records=4??
模式:  local存在此路径, 已经删除......WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1WARN - LoadSnappy.<clinit>(46) | Snappy native library not loadedINFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1242054158_0001INFO - LocalJobRunner$Job.run(340) | Waiting for map tasksINFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1242054158_0001_m_000000_0INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : nullINFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/output/sort.txt:0+28INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map outputINFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0INFO - Task.done(858) | Task:attempt_local1242054158_0001_m_000000_0 is done. And is in the process of commitingINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.sendDone(970) | Task 'attempt_local1242054158_0001_m_000000_0' done.INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1242054158_0001_m_000000_0INFO - LocalJobRunner$Job.run(348) | Map task executor complete.INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : nullINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segmentsINFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 35 bytesINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.done(858) | Task:attempt_local1242054158_0001_r_000000_0 is done. And is in the process of commitingINFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.commit(1011) | Task attempt_local1242054158_0001_r_000000_0 is allowed to commit nowINFO - FileOutputCommitter.commitTask(173) | Saved output of task 'attempt_local1242054158_0001_r_000000_0' to hdfs://192.168.75.130:9000/root/outputsortINFO - LocalJobRunner$Job.statusUpdate(466) | reduce > reduceINFO - Task.sendDone(970) | Task 'attempt_local1242054158_0001_r_000000_0' done.INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1242054158_0001INFO - Counters.log(585) | Counters: 19INFO - Counters.log(587) |   File Output Format Counters INFO - Counters.log(589) |     Bytes Written=26INFO - Counters.log(587) |   File Input Format Counters INFO - Counters.log(589) |     Bytes Read=28INFO - Counters.log(587) |   FileSystemCountersINFO - Counters.log(589) |     FILE_BYTES_READ=393INFO - Counters.log(589) |     HDFS_BYTES_READ=56INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=135742INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=26INFO - Counters.log(587) |   Map-Reduce FrameworkINFO - Counters.log(589) |     Map output materialized bytes=39INFO - Counters.log(589) |     Map input records=4INFO - Counters.log(589) |     Reduce shuffle bytes=0INFO - Counters.log(589) |     Spilled Records=8INFO - Counters.log(589) |     Map output bytes=25INFO - Counters.log(589) |     Total committed heap usage (bytes)=455475200INFO - Counters.log(589) |     Combine input records=0INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112INFO - Counters.log(589) |     Reduce input records=4INFO - Counters.log(589) |     Reduce input groups=4INFO - Counters.log(589) |     Combine output records=0INFO - Counters.log(589) |     Reduce output records=4INFO - Counters.log(589) |     Map output records=4



排序结果如下:

Java代码 复制代码?收藏代码
  1. dd??99999??
  2. a???784??
  3. b???12??
  4. c???-11??
dd	99999a	784b	12c	-11


单机模式调试通过之后,我们就可以考虑采用hadoop集群的模式来跑,这时候有2种方式,可以来完成这件事,第一是,为了方便将整个项目打成一个jar包,上传到Linux上,然后执行shell命令:
bin/hadoop jar tt.jar com.qin.sort.TestSort
来进行测试,注意,散仙是为了方便,路径是写死在程序里面所以后面不用输入,输入和输出路径,正式的开发,为了灵活性,一般会通过外部传产来指定输入和输出路径。

第二种方式,也比较方便,直接在eclipse中提交到hadoop集群作业中,不过即使是使用eclipse来提交作业,还是需要将整个项目打成一个jar包,只不过这时是eclipse帮我们提交作业的,这样我们就可以Win平台上直接提交运行hadoop作业了,但是主流的还是使用上传jar包的方式。关于把整个项目打成一个jar包,散仙在后面会上传一个ant脚本,直接执行它就可以了,这样就可以把有依赖关系的类打在一起,把一整个项目做为一个整体,在hadoop上,只需要指定jar,指定类的全名称,和输入,输出路径即可。ant的脚本内容如下:

Xml代码 复制代码?收藏代码
  1. <project?name="${component.name}"?basedir="."?default="jar">??
  2. ????<property?environment="env"/>??
  3. ????<!--?
  4. ????<property?name="hadoop.home"?value="${env.HADOOP_HOME}"/>?
  5. ????-->??
  6. ????<property?name="hadoop.home"?value="D:/hadoop-1.2.0"/>??
  7. ????<!--?指定jar包的名字?-->??
  8. ????<property?name="jar.name"?value="tt.jar"/>??
  9. ????<path?id="project.classpath">??
  10. ????????<fileset?dir="lib">??
  11. ????????????<include?name="*.jar"?/>??
  12. ????????</fileset>??
  13. ????????<fileset?dir="${hadoop.home}">??
  14. ????????????<include?name="**/*.jar"?/>??
  15. ????????</fileset>??
  16. ????</path>??
  17. ????<target?name="clean"?>??
  18. ????????<delete?dir="bin"?failonerror="false"?/>??
  19. ????????<mkdir?dir="bin"/>??
  20. ????</target>???
  21. ????<target?name="build"?depends="clean">??
  22. ????????<echo?message="${ant.project.name}:?${ant.file}"/>??
  23. ????????<javac?destdir="bin"?encoding="utf-8"?debug="true"?includeantruntime="false"?debuglevel="lines,vars,source">??
  24. ????????????<src?path="src"/>??
  25. ????????????<exclude?name="**/.svn"?/>??
  26. ????????????<classpath?refid="project.classpath"/>??
  27. ????????</javac>??
  28. ????????<copy?todir="bin">??
  29. ????????????<fileset?dir="src">??
  30. ????????????????<include?name="*config*"/>??
  31. ????????????</fileset>??
  32. ????????</copy>??
  33. ????</target>??
  34. ??????
  35. ????<target?name="jar"?depends="build">??
  36. ????????<copy?todir="bin/lib">??
  37. ????????????<fileset?dir="lib">??
  38. ????????????????<include?name="**/*.*"/>??
  39. ????????????</fileset>??
  40. ????????</copy>??
  41. ??????????
  42. ????????<path?id="lib-classpath">??
  43. ????????????<fileset?dir="lib"?includes="**/*.jar"?/>??
  44. ????????</path>??
  45. ??????????
  46. ????????<pathconvert?property="my.classpath"?pathsep="?"?>??
  47. ????????????<mapper>??
  48. ????????????????<chainedmapper>??
  49. ????????????????????<!--?移除绝对路径?-->??
  50. ????????????????????<flattenmapper?/>??
  51. ????????????????????<!--?加上lib前缀?-->??
  52. ????????????????????<globmapper?from="*"?to="lib/*"?/>??
  53. ???????????????</chainedmapper>??
  54. ?????????????</mapper>??
  55. ?????????????<path?refid="lib-classpath"?/>??
  56. ????????</pathconvert>??
  57. ??????????
  58. ????????<jar?basedir="bin"?destfile="${jar.name}"?>??
  59. ????????????<include?name="**/*"/>??
  60. ????????????<!--?define?MANIFEST.MF?-->??
  61. ????????????<manifest>??
  62. ????????????????<attribute?name="Class-Path"?value="${my.classpath}"?/>??
  63. ????????????</manifest>??
  64. ????????</jar>??
  65. ????</target>??
  66. </project>??
<project name="${component.name}" basedir="." default="jar">	<property environment="env"/>	<!--	<property name="hadoop.home" value="${env.HADOOP_HOME}"/>	-->	<property name="hadoop.home" value="D:/hadoop-1.2.0"/>	<!-- 指定jar包的名字 -->	<property name="jar.name" value="tt.jar"/>	<path id="project.classpath">		<fileset dir="lib">			<include name="*.jar" />		</fileset>		<fileset dir="${hadoop.home}">			<include name="**/*.jar" />		</fileset>	</path>	<target name="clean" >	 	<delete dir="bin" failonerror="false" />		<mkdir dir="bin"/> 	</target>		<target name="build" depends="clean"> 		<echo message="${ant.project.name}: ${ant.file}"/> 		<javac destdir="bin" encoding="utf-8" debug="true" includeantruntime="false" debuglevel="lines,vars,source">            <src path="src"/> 			<exclude name="**/.svn" />            <classpath refid="project.classpath"/>        </javac>		<copy todir="bin">			<fileset dir="src">				<include name="*config*"/>			</fileset>		</copy> 	</target>	 	<target name="jar" depends="build"> 		<copy todir="bin/lib"> 			<fileset dir="lib"> 				<include name="**/*.*"/>	 		</fileset> 		</copy> 		 		<path id="lib-classpath"> 			<fileset dir="lib" includes="**/*.jar" /> 		</path> 		 		<pathconvert property="my.classpath" pathsep=" " > 			<mapper> 		    	<chainedmapper> 		        	<!-- 移除绝对路径 --> 		        	<flattenmapper /> 		        	<!-- 加上lib前缀 --> 		        	<globmapper from="*" to="lib/*" /> 		       </chainedmapper> 		     </mapper> 		     <path refid="lib-classpath" /> 		</pathconvert> 		 		<jar basedir="bin" destfile="${jar.name}" > 			<include name="**/*"/> 			<!-- define MANIFEST.MF --> 			<manifest>				<attribute name="Class-Path" value="${my.classpath}" /> 			</manifest> 		</jar> 	</target></project>


运行上面的这个ant脚本之后,我们的项目就会被打成一个jar包,截图如下:


jar包有了之后,我们先测试在eclipse上如何把作业提交到hadoop集群上,只要把main方面的代码,稍加改动即可:

Java代码 复制代码?收藏代码
  1. ?/**?
  2. ??????*?main方法?
  3. ??????*?*/??
  4. ?????public?static?void?main(String[]?args)?throws?Exception{??
  5. ?????????String?inputPath="hdfs://192.168.75.130:9000/root/output";???????
  6. ??????????String?outputPath="hdfs://192.168.75.130:9000/root/outputsort";??
  7. ??????????JobConf?conf=new?JobConf();??
  8. ?????????//Configuration?conf=new?Configuration();//可以使用这个conf来测试Local模式??
  9. ?????????//如果在src目录下有,mapred-site.xml文件,就不要此行代码??
  10. ?????????//注意此行代码也是在非Local模式下才使用??
  11. ?????????conf.set("mapred.job.tracker","192.168.75.130:9001");??
  12. ?????????//?conf.get("mapred.job.tracker");??
  13. ?????????System.out.println("模式:??"+conf.get("mapred.job.tracker"));??
  14. ?????????//?conf.setJar("tt.jar");?非Local模式下使用??
  15. ??????????FileSystem??fs=FileSystem.get(conf);??
  16. ??????????Path?pout=new?Path(outputPath);??
  17. ??????????if(fs.exists(pout)){??
  18. ??????????????fs.delete(pout,?true);??
  19. ??????????????System.out.println("存在此路径,?已经删除......");??
  20. ??????????}?????????????
  21. ??????????Job?job=new?Job(conf,?"sort123");???
  22. ??????????job.setJarByClass(TestSort.class);??
  23. ??????????job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型??
  24. ??????????FileInputFormat.setInputPaths(job,?new?Path(inputPath));??//输入路径??
  25. ??????????FileOutputFormat.setOutputPath(job,?new?Path(outputPath));//输出路径????
  26. ??????????job.setMapperClass(SMapper.class);//map类??
  27. ??????????job.setReducerClass(SReduce.class);//reduce类??
  28. ??????????job.setSortComparatorClass(SSort.class);//排序类??
  29. //??????????job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);??
  30. //????????job.setOutputFormatClass(TextOutputFormat.class);??
  31. ??????????System.exit(job.waitForCompletion(true)???0?:?1);????
  32. ???????????
  33. ???????????
  34. ????}??
 /**	  * main方法	  * */	 public static void main(String[] args) throws Exception{		 String inputPath="hdfs://192.168.75.130:9000/root/output";	    		  String outputPath="hdfs://192.168.75.130:9000/root/outputsort";		  JobConf conf=new JobConf();		 //Configuration conf=new Configuration();//可以使用这个conf来测试Local模式		 //如果在src目录下有,mapred-site.xml文件,就不要此行代码		 //注意此行代码也是在非Local模式下才使用		 conf.set("mapred.job.tracker","192.168.75.130:9001");		 // conf.get("mapred.job.tracker");		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));		 // conf.setJar("tt.jar"); 非Local模式下使用		  FileSystem  fs=FileSystem.get(conf);		  Path pout=new Path(outputPath);		  if(fs.exists(pout)){			  fs.delete(pout, true);			  System.out.println("存在此路径, 已经删除......");		  } 		  		  Job job=new Job(conf, "sort123");           job.setJarByClass(TestSort.class);          job.setOutputKeyClass(IntWritable.class);//告诉map,reduce输出K,V的类型          FileInputFormat.setInputPaths(job, new Path(inputPath));  //输入路径          FileOutputFormat.setOutputPath(job, new Path(outputPath));//输出路径            job.setMapperClass(SMapper.class);//map类          job.setReducerClass(SReduce.class);//reduce类          job.setSortComparatorClass(SSort.class);//排序类//          job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);//		  job.setOutputFormatClass(TextOutputFormat.class);          System.exit(job.waitForCompletion(true) ? 0 : 1);  		 		 	}



运行程序,输出,如下:

Java代码 复制代码?收藏代码
  1. 模式:??192.168.75.130:9001??
  2. 存在此路径,?已经删除......??
  3. WARN?-?JobClient.copyAndConfigureFiles(746)?|?Use?GenericOptionsParser?for?parsing?the?arguments.?Applications?should?implement?Tool?for?the?same.??
  4. INFO?-?FileInputFormat.listStatus(237)?|?Total?input?paths?to?process?:?1??
  5. WARN?-?NativeCodeLoader.<clinit>(52)?|?Unable?to?load?native-hadoop?library?for?your?platform...?using?builtin-java?classes?where?applicable??
  6. WARN?-?LoadSnappy.<clinit>(46)?|?Snappy?native?library?not?loaded??
  7. INFO?-?JobClient.monitorAndPrintJob(1380)?|?Running?job:?job_201403252058_0035??
  8. INFO?-?JobClient.monitorAndPrintJob(1393)?|??map?0%?reduce?0%??
  9. INFO?-?JobClient.monitorAndPrintJob(1393)?|??map?100%?reduce?0%??
  10. INFO?-?JobClient.monitorAndPrintJob(1393)?|??map?100%?reduce?33%??
  11. INFO?-?JobClient.monitorAndPrintJob(1393)?|??map?100%?reduce?100%??
  12. INFO?-?JobClient.monitorAndPrintJob(1448)?|?Job?complete:?job_201403252058_0035??
  13. INFO?-?Counters.log(585)?|?Counters:?29??
  14. INFO?-?Counters.log(587)?|???Job?Counters???
  15. INFO?-?Counters.log(589)?|?????Launched?reduce?tasks=1??
  16. INFO?-?Counters.log(589)?|?????SLOTS_MILLIS_MAPS=8498??
  17. INFO?-?Counters.log(589)?|?????Total?time?spent?by?all?reduces?waiting?after?reserving?slots?(ms)=0??
  18. INFO?-?Counters.log(589)?|?????Total?time?spent?by?all?maps?waiting?after?reserving?slots?(ms)=0??
  19. INFO?-?Counters.log(589)?|?????Launched?map?tasks=1??
  20. INFO?-?Counters.log(589)?|?????Data-local?map?tasks=1??
  21. INFO?-?Counters.log(589)?|?????SLOTS_MILLIS_REDUCES=9667??
  22. INFO?-?Counters.log(587)?|???File?Output?Format?Counters???
  23. INFO?-?Counters.log(589)?|?????Bytes?Written=26??
  24. INFO?-?Counters.log(587)?|???FileSystemCounters??
  25. INFO?-?Counters.log(589)?|?????FILE_BYTES_READ=39??
  26. INFO?-?Counters.log(589)?|?????HDFS_BYTES_READ=140??
  27. INFO?-?Counters.log(589)?|?????FILE_BYTES_WRITTEN=117654??
  28. INFO?-?Counters.log(589)?|?????HDFS_BYTES_WRITTEN=26??
  29. INFO?-?Counters.log(587)?|???File?Input?Format?Counters???
  30. INFO?-?Counters.log(589)?|?????Bytes?Read=28??
  31. INFO?-?Counters.log(587)?|???Map-Reduce?Framework??
  32. INFO?-?Counters.log(589)?|?????Map?output?materialized?bytes=39??
  33. INFO?-?Counters.log(589)?|?????Map?input?records=4??
  34. INFO?-?Counters.log(589)?|?????Reduce?shuffle?bytes=39??
  35. INFO?-?Counters.log(589)?|?????Spilled?Records=8??
  36. INFO?-?Counters.log(589)?|?????Map?output?bytes=25??
  37. INFO?-?Counters.log(589)?|?????Total?committed?heap?usage?(bytes)=176033792??
  38. INFO?-?Counters.log(589)?|?????CPU?time?spent?(ms)=1140??
  39. INFO?-?Counters.log(589)?|?????Combine?input?records=0??
  40. INFO?-?Counters.log(589)?|?????SPLIT_RAW_BYTES=112??
  41. INFO?-?Counters.log(589)?|?????Reduce?input?records=4??
  42. INFO?-?Counters.log(589)?|?????Reduce?input?groups=4??
  43. INFO?-?Counters.log(589)?|?????Combine?output?records=0??
  44. INFO?-?Counters.log(589)?|?????Physical?memory?(bytes)?snapshot=259264512??
  45. INFO?-?Counters.log(589)?|?????Reduce?output?records=4??
  46. INFO?-?Counters.log(589)?|?????Virtual?memory?(bytes)?snapshot=1460555776??
  47. INFO?-?Counters.log(589)?|?????Map?output?records=4??
模式:  192.168.75.130:9001存在此路径, 已经删除......WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicableWARN - LoadSnappy.<clinit>(46) | Snappy native library not loadedINFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201403252058_0035INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201403252058_0035INFO - Counters.log(585) | Counters: 29INFO - Counters.log(587) |   Job Counters INFO - Counters.log(589) |     Launched reduce tasks=1INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=8498INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0INFO - Counters.log(589) |     Launched map tasks=1INFO - Counters.log(589) |     Data-local map tasks=1INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9667INFO - Counters.log(587) |   File Output Format Counters INFO - Counters.log(589) |     Bytes Written=26INFO - Counters.log(587) |   FileSystemCountersINFO - Counters.log(589) |     FILE_BYTES_READ=39INFO - Counters.log(589) |     HDFS_BYTES_READ=140INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=117654INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=26INFO - Counters.log(587) |   File Input Format Counters INFO - Counters.log(589) |     Bytes Read=28INFO - Counters.log(587) |   Map-Reduce FrameworkINFO - Counters.log(589) |     Map output materialized bytes=39INFO - Counters.log(589) |     Map input records=4INFO - Counters.log(589) |     Reduce shuffle bytes=39INFO - Counters.log(589) |     Spilled Records=8INFO - Counters.log(589) |     Map output bytes=25INFO - Counters.log(589) |     Total committed heap usage (bytes)=176033792INFO - Counters.log(589) |     CPU time spent (ms)=1140INFO - Counters.log(589) |     Combine input records=0INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112INFO - Counters.log(589) |     Reduce input records=4INFO - Counters.log(589) |     Reduce input groups=4INFO - Counters.log(589) |     Combine output records=0INFO - Counters.log(589) |     Physical memory (bytes) snapshot=259264512INFO - Counters.log(589) |     Reduce output records=4INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=1460555776INFO - Counters.log(589) |     Map output records=4


我们可以看出,运行正常,排序的内容如下:

Java代码 复制代码?收藏代码
  1. dd??99999??
  2. a???784??
  3. b???12??
  4. c???-11??
dd	99999a	784b	12c	-11


结果和local模式下的一样,还有一个与local模式不同的地方是,我们可以在http://192.168.75.130:50030/jobtracker.jsp的任务页面上找到刚才执行的任务状况,这一点在Local模式下运行程序,是没有的。/size]

[size=large]最后,散仙再来看下,如何将jar包,上传到Linux提交作业到hadoop集群上。刚才,我们已经把jar给打好了,现在只需上传到linux上即可:

然后开始执行shell命令运行程序:



到此,我们已经完美的执行成功,最后一点需要注意的是,在执行排序任务时,如果出现异常:

Java代码 复制代码?收藏代码
  1. java.lang.Exception:?java.io.IOException:?Type?mismatch?in?key?from?map:?expected?org.apache.hadoop.io.LongWritable,?recieved?org.apache.hadoop.io.IntWritable??
  2. ????at?org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)??
  3. Caused?by:?java.io.IOException:?Type?mismatch?in?key?from?map:?expected?org.apache.hadoop.io.LongWritable,?recieved?org.apache.hadoop.io.IntWritable??
  4. ????at?org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)??
  5. ????at?org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)??
  6. ????at?org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)??
  7. ????at?com.qin.sort.TestSort$SMapper.map(TestSort.java:51)??
  8. ????at?com.qin.sort.TestSort$SMapper.map(TestSort.java:1)??
  9. ????at?org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)??
  10. ????at?org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)??
  11. ????at?org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)??
  12. ????at?org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)??
  13. ????at?java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)??
  14. ????at?java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)??
  15. ????at?java.util.concurrent.FutureTask.run(FutureTask.java:166)??
  16. ????at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)??
  17. ????at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)??
  18. ????at?java.lang.Thread.run(Thread.java:722)??
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.IntWritable	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, recieved org.apache.hadoop.io.IntWritable	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1019)	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:690)	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)	at com.qin.sort.TestSort$SMapper.map(TestSort.java:51)	at com.qin.sort.TestSort$SMapper.map(TestSort.java:1)	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)	at java.util.concurrent.FutureTask.run(FutureTask.java:166)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)	at java.lang.Thread.run(Thread.java:722)



这个异常的出现,多半是因为,我们没有指定输出的Key,或者Value,或者指定的类型不一致,导致,我们只需要正确的设置输出的Key或者Value的类型即可.

Java代码 复制代码?收藏代码
  1. job.setOutputKeyClass(IntWritable.class);??
  2. ???????????job.setOutputValueClass(Text.class);??
job.setOutputKeyClass(IntWritable.class);		   job.setOutputValueClass(Text.class);




设置完后,就可以正常测试运行了。

  相关解决方案