当前位置: 代码迷 >> Eclipse >> Hbase + Mapreduce + eclipse范例
  详细解决方案

Hbase + Mapreduce + eclipse范例

热度:772   发布时间:2016-04-23 00:11:42.0
Hbase + Mapreduce + eclipse实例

前面blog中提到了 eclipse操作单机版的Hbase列子  不熟悉的朋友可以去看看  

eclipse 连接并操作单机版Hbase


本篇文章介绍一个 Mapreduce   读取   Hbase  中数据    并进行计算 列子    类似与    wordcount   不过  此时的输入  是从 Hbase中读取


首先  需要创建输入源   

启动hbase,打开Hbase shell   这里  我的配置文件中  不再是单机了  而是采用了hdfs作为 文件系统


<span style="font-size:18px;"><configuration><property><name>hbase.rootdir</name><value>hdfs://localhost:9000/hbase</value><description>数据存放的位置。</description></property><property><name>dfs.replication</name><value>1</value><description>指定副本个数为1,因为伪分布式。</description></property></configuration></span>

进入hbase shell之后  创建 表

<span style="font-size:18px;">hbase(main):007:0> create 'data_input', 'message'0 row(s) in 1.1110 secondshbase(main):008:0> create 'data_output',{NAME=>'message',VERSION=>1}0 row(s) in 1.0900 seconds</span>

data_input表用来存放 mapreduce的输入数据

data_output 用来存放mapreduce的输出数据


然后往data_inout表中生成随机数据,这里用eclipse来操作Hbase  往表  data_input 里面写数据  代码如下:


<span style="font-size:18px;">package hbase_mapred1;import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;public class Importer1 {    public static void main(String[] args) throws Exception {                String [] pages = {"/", "/a.html", "/b.html", "/c.html"};                //HBaseConfiguration hbaseConfig = new HBaseConfiguration();                           Configuration hbaseConfig=HBaseConfiguration.create();        HTable htable = new HTable(hbaseConfig, "data_input");        htable.setAutoFlush(false);        htable.setWriteBufferSize(1024 * 1024 * 12);                int totalRecords = 100000;        int maxID = totalRecords / 1000;        Random rand = new Random();        System.out.println("importing " + totalRecords + " records ....");        for (int i=0; i < totalRecords; i++)        {            int userID = rand.nextInt(maxID) + 1;            byte [] rowkey = Bytes.add(Bytes.toBytes(userID), Bytes.toBytes(i));            String randomPage = pages[rand.nextInt(pages.length)];            Put put = new Put(rowkey);            put.add(Bytes.toBytes("message"), Bytes.toBytes("page"), Bytes.toBytes(randomPage));            htable.put(put);        }        htable.flushCommits();        htable.close();        System.out.println("done");    }}</span>

到这里为止,数据已经写入到 表data_input表中去了,接下来用该表的数据

作为Mapreduce的输入数据 

代码如下:


<span style="font-size:18px;">package hbase_mapred1;import java.io.IOException;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;public class FreqCounter1 {    static class Mapper1 extends TableMapper<ImmutableBytesWritable, IntWritable> {        private int numRecords = 0;        private static final IntWritable one = new IntWritable(1);        @Override        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {            // extract userKey from the compositeKey (userId + counter)            ImmutableBytesWritable userKey = new ImmutableBytesWritable(row.get(), 0, Bytes.SIZEOF_INT);            try {                context.write(userKey, one);            } catch (InterruptedException e) {                throw new IOException(e);            }            numRecords++;            if ((numRecords % 10000) == 0) {                context.setStatus("mapper processed " + numRecords + " records so far");            }        }    }    public static class Reducer1 extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {        public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context)                throws IOException, InterruptedException {            int sum = 0;            for (IntWritable val : values) {                sum += val.get();            }            Put put = new Put(key.get());            put.add(Bytes.toBytes("message"), Bytes.toBytes("total"), Bytes.toBytes(sum));            System.out.println(String.format("stats :   key : %d,  count : %d", Bytes.toInt(key.get()), sum));            context.write(key, put);        }    }        public static void main(String[] args) throws Exception {        HBaseConfiguration conf = new HBaseConfiguration();        Job job = new Job(conf, "Hbase_FreqCounter1");        job.setJarByClass(FreqCounter1.class);        Scan scan = new Scan();   //     String columns = "details"; // comma seperated      //  scan.addColumns(columns);        scan.setFilter(new FirstKeyOnlyFilter());        TableMapReduceUtil.initTableMapperJob("data_input", scan, Mapper1.class, ImmutableBytesWritable.class,                IntWritable.class, job);        TableMapReduceUtil.initTableReducerJob("data_output", Reducer1.class, job);        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}</span>

大概逻辑为:


map阶段读取datainput中的数据,然后标记为 1    熟悉map  reduce 流程的同学应该很容易理解  


将 data input表中相同的key  在shuffle阶段在一起   然后 reduce阶段读取有多少个 相同的key  相加  得到一个总数  将该总数存入data_output表中


最后来验证结果,

由于 无法直接阅读 Hbase中的数据 ,用以个程序将 hbase中的数据转换为可以阅读的数据格式  代码如下


<span style="font-size:18px;">package hbase_mapred1;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.ResultScanner;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.util.Bytes;public class PrintUserCount {    public static void main(String[] args) throws Exception {        HBaseConfiguration conf = new HBaseConfiguration();        HTable htable = new HTable(conf, "data_output");        Scan scan = new Scan();        ResultScanner scanner = htable.getScanner(scan);        Result r;        while (((r = scanner.next()) != null)) {            ImmutableBytesWritable b = r.getBytes();            byte[] key = r.getRow();            int userId = Bytes.toInt(key);            byte[] totalValue = r.getValue(Bytes.toBytes("message"), Bytes.toBytes("total"));            int count = Bytes.toInt(totalValue);            System.out.println("key: " + userId+ ",  count: " + count);        }        scanner.close();        htable.close();    }}</span>


<span style="font-size:18px;">key: 1,  count: 1007key: 2,  count: 1034key: 3,  count: 962key: 4,  count: 1001key: 5,  count: 1024key: 6,  count: 1033key: 7,  count: 984key: 8,  count: 987key: 9,  count: 988key: 10,  count: 990key: 11,  count: 1069key: 12,  count: 965key: 13,  count: 1000key: 14,  count: 998key: 15,  count: 1002key: 16,  count: 983。。。</span>



注意相应的包需要导入程序   目录结构为



  相关解决方案