前面blog中提到了 eclipse操作单机版的Hbase列子 不熟悉的朋友可以去看看
eclipse 连接并操作单机版Hbase
本篇文章介绍一个 Mapreduce 读取 Hbase 中数据 并进行计算 列子 类似与 wordcount 不过 此时的输入 是从 Hbase中读取
首先 需要创建输入源
启动hbase,打开Hbase shell 这里 我的配置文件中 不再是单机了 而是采用了hdfs作为 文件系统
<span style="font-size:18px;"><configuration><property><name>hbase.rootdir</name><value>hdfs://localhost:9000/hbase</value><description>数据存放的位置。</description></property><property><name>dfs.replication</name><value>1</value><description>指定副本个数为1,因为伪分布式。</description></property></configuration></span>
进入hbase shell之后 创建 表
<span style="font-size:18px;">hbase(main):007:0> create 'data_input', 'message'0 row(s) in 1.1110 secondshbase(main):008:0> create 'data_output',{NAME=>'message',VERSION=>1}0 row(s) in 1.0900 seconds</span>
data_input表用来存放 mapreduce的输入数据
data_output 用来存放mapreduce的输出数据
然后往data_inout表中生成随机数据,这里用eclipse来操作Hbase 往表 data_input 里面写数据 代码如下:
<span style="font-size:18px;">package hbase_mapred1;import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;public class Importer1 { public static void main(String[] args) throws Exception { String [] pages = {"/", "/a.html", "/b.html", "/c.html"}; //HBaseConfiguration hbaseConfig = new HBaseConfiguration(); Configuration hbaseConfig=HBaseConfiguration.create(); HTable htable = new HTable(hbaseConfig, "data_input"); htable.setAutoFlush(false); htable.setWriteBufferSize(1024 * 1024 * 12); int totalRecords = 100000; int maxID = totalRecords / 1000; Random rand = new Random(); System.out.println("importing " + totalRecords + " records ...."); for (int i=0; i < totalRecords; i++) { int userID = rand.nextInt(maxID) + 1; byte [] rowkey = Bytes.add(Bytes.toBytes(userID), Bytes.toBytes(i)); String randomPage = pages[rand.nextInt(pages.length)]; Put put = new Put(rowkey); put.add(Bytes.toBytes("message"), Bytes.toBytes("page"), Bytes.toBytes(randomPage)); htable.put(put); } htable.flushCommits(); htable.close(); System.out.println("done"); }}</span>
到这里为止,数据已经写入到 表data_input表中去了,接下来用该表的数据
作为Mapreduce的输入数据
代码如下:
<span style="font-size:18px;">package hbase_mapred1;import java.io.IOException;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;public class FreqCounter1 { static class Mapper1 extends TableMapper<ImmutableBytesWritable, IntWritable> { private int numRecords = 0; private static final IntWritable one = new IntWritable(1); @Override public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { // extract userKey from the compositeKey (userId + counter) ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_INT); try { context.write(userKey, one); } catch (InterruptedException e) { throw new IOException(e); } numRecords++; if ((numRecords % 10000) == 0) { context.setStatus("mapper processed " + numRecords + " records so far"); } } } public static class Reducer1 extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> { public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } Put put = new Put(key.get()); put.add(Bytes.toBytes("message"), Bytes.toBytes("total"), Bytes.toBytes(sum)); System.out.println(String.format("stats : key : %d, count : %d", Bytes.toInt(key.get()), sum)); context.write(key, put); } } public static void main(String[] args) throws Exception { HBaseConfiguration conf = new HBaseConfiguration(); Job job = new Job(conf, "Hbase_FreqCounter1"); job.setJarByClass(FreqCounter1.class); Scan scan = new Scan(); // String columns = "details"; // comma seperated // scan.addColumns(columns); scan.setFilter(new FirstKeyOnlyFilter()); TableMapReduceUtil.initTableMapperJob("data_input", scan, Mapper1.class, ImmutableBytesWritable.class, IntWritable.class, job); TableMapReduceUtil.initTableReducerJob("data_output", Reducer1.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); }}</span>
大概逻辑为:
map阶段读取datainput中的数据,然后标记为 1 熟悉map reduce 流程的同学应该很容易理解
将 data input表中相同的key 在shuffle阶段在一起 然后 reduce阶段读取有多少个 相同的key 相加 得到一个总数 将该总数存入data_output表中
最后来验证结果,
由于 无法直接阅读 Hbase中的数据 ,用以个程序将 hbase中的数据转换为可以阅读的数据格式 代码如下
<span style="font-size:18px;">package hbase_mapred1;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.ResultScanner;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.util.Bytes;public class PrintUserCount { public static void main(String[] args) throws Exception { HBaseConfiguration conf = new HBaseConfiguration(); HTable htable = new HTable(conf, "data_output"); Scan scan = new Scan(); ResultScanner scanner = htable.getScanner(scan); Result r; while (((r = scanner.next()) != null)) { ImmutableBytesWritable b = r.getBytes(); byte[] key = r.getRow(); int userId = Bytes.toInt(key); byte[] totalValue = r.getValue(Bytes.toBytes("message"), Bytes.toBytes("total")); int count = Bytes.toInt(totalValue); System.out.println("key: " + userId+ ", count: " + count); } scanner.close(); htable.close(); }}</span>
<span style="font-size:18px;">key: 1, count: 1007key: 2, count: 1034key: 3, count: 962key: 4, count: 1001key: 5, count: 1024key: 6, count: 1033key: 7, count: 984key: 8, count: 987key: 9, count: 988key: 10, count: 990key: 11, count: 1069key: 12, count: 965key: 13, count: 1000key: 14, count: 998key: 15, count: 1002key: 16, count: 983。。。</span>
注意相应的包需要导入程序 目录结构为