当前位置: 代码迷 >> 综合 >> storm wordcount
  详细解决方案

storm wordcount

热度:99   发布时间:2023-12-25 23:17:07.0

准备

前面几篇文章已经介绍了如何搭建storm集群。接下来学习如何编写storm代码,使用maven构建,本地模拟集群测试代码

编码

maven 配置

使用maven来配置需要的jar包,只需要一个0.9.2版本的 storm即可

   <dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.9.2-incubating</version></dependency></dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

创建SentenceSpout

创建sentenceSpout作为storm的数据源,需要继承 BaseRichSpout ,它需要重写几个方法

     /*spout初始化时调用这个方法map包含storm配置信息TopologyContext对象提供了topology中组件的信息SpoutOutputCollector对象提供了发射tuple的方法*/public void open(Map config, TopologyContext context, SpoutOutputCollector collector){this.collector=collector;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

open方法是spout的初始化时会调用的方法,传入该方法的SpoutOutputCollector对象提供了向外发射数据流的方法

 /*声明spout会发射一个数据流,其中的tuple包含一个字段sentence*/public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("sentence"));};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

declareOutputFields方法声明了该spout会发射一个数据流,该数据流包含一个key,名称为 sentence

   /*Storm通过调用这个方法向输出的collector发射tuple*/public void nextTuple(){if(index < sentences.length){this.collector.emit(new Values(sentences[index]));index++;}else{index=0; //如果不让index归0,sentences只会发送一次}Utils.waitForMillis(1);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

storm会调用spout的nextTuple,因此我们需要在nextTuple中使用collector.emit()方法向外发射数据流。 
完成spout如下:


import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;import java.util.Map;/*** Created by chenhong on 16/1/25.*/
public class SentenceSpout extends BaseRichSpout{
    private SpoutOutputCollector collector;private String[] sentences={"my dog has fleas","i like cold beverages","the dog ate my homework","don't hava a cow man ","i don't think i like fleas"};private int index=0;/*声明spout会发射一个数据流,其中的tuple包含一个字段sentence*/public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("sentence"));};/*spout初始化时调用这个方法map包含storm配置信息TopologyContext对象提供了topology中组件的信息SpoutOutputCollector对象提供了发射tuple的方法*/public void open(Map config, TopologyContext context, SpoutOutputCollector collector){this.collector=collector;}/*Storm通过调用这个方法向输出的collector发射tuple*/public void nextTuple(){if(index < sentences.length){this.collector.emit(new Values(sentences[index]));index++;}else{index=0; //如果不让index归0,sentences只会发送一次}Utils.waitForMillis(1);}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

创建SplitSentenceBolt

创建SplitSentenceBolt,对spout发射的句子按空格分隔成单词,需要继承BaseRichBolt,它需要重写一下几个方法:

     /*bolt初始化时调用*/public void prepare(Map config ,TopologyContext context,OutputCollector collector){this.collector = collector;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

prepare在bolt初始化时会被调用,类似spout的open方法,OutputCollector用来向外发射一个数据流

    /*声明每个tuple包含一个字段 word*/public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("word"));}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

declareOutputFields表示该bolt会向外发射一个数据流,该数据流包含一个key,key的名称为 word

  /*每当从订阅的数据流中接收一个tuple,都会调用这个方法*/public void execute(Tuple tuple){String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){this.collector.emit(new Values(word));}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

该方法会在bolt接收到上游发射的数据流后被调用 
完成代码如下:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;import java.util.Map;/*** Created by chenhong on 16/1/25.*/
public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;/*bolt初始化时调用*/public void prepare(Map config ,TopologyContext context,OutputCollector collector){this.collector = collector;}/*声明每个 tuple包含一个字段 word*/public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("word"));}/*每当从订阅的数据流中接收一个tuple,都会调用这个方法*/public void execute(Tuple tuple){String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words){this.collector.emit(new Values(word));}}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

创建WordCountBolt

WordCountBolt用来对SplitSentenceBolt发射的单词进行计数,WordCountBolt需要实现的方法与SplitSentenceBolt类似,因此不再对方法多做解释。完成代码如下:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;import java.util.HashMap;
import java.util.Map;/*** Created by chenhong on 16/1/25.*/
public class WordCountBolt extends BaseRichBolt{
    private OutputCollector collector;private HashMap<String,Long> counts = null;public void prepare(Map config , TopologyContext context,OutputCollector collector){this.collector = collector;this.counts = new HashMap<String, Long>();}public void execute(Tuple tuple){String word = tuple.getStringByField("word");Long count = this.counts.get(word);if(count ==null){count =0L;}count++;this.counts.put(word,count);this.collector.emit(new Values(word,count));}public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word","count"));}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

创建ReportBolt

ReportBolt作为最后一个Bolt,它的作用是对计数完的单词进行打印或持久化到数据库。因此无需再向外发射数据流。因此它的declareOutputFields不需要声明declare

      /*该bolt位于末端,所以declareOutputFields为空**/public void declareOutputFields(OutputFieldsDeclarer declarer){}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在该计数完成后,将计数结果打印,因此需要重写cleanup方法

      /*cleanup方法用来释放bolt占用的资源*/public void cleanup(){System.out.println("--- FINAL COUNTS ---");List<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for(String key: keys){System.out.println(key+" : "+this.counts.get(key));}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

完成代码如下:

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;import java.util.*;/*** Created by chenhong on 16/1/25.*/
public class ReportBolt extends BaseRichBolt {
    private HashMap<String,Long> counts =null;public void prepare(Map config, TopologyContext context, OutputCollector collector){this.counts = new HashMap<String, Long>();}public void execute(Tuple tuple){String word = tuple.getStringByField("word");Long count = tuple.getLongByField("count");this.counts.put(word,count);}/*该bolt位于末端,所以declareOutputFields为空**/public void declareOutputFields(OutputFieldsDeclarer declarer){}/*cleanup方法用来释放bolt占用的资源*/public void cleanup(){System.out.println("--- FINAL COUNTS ---");List<String> keys = new ArrayList<String>();keys.addAll(this.counts.keySet());Collections.sort(keys);for(String key: keys){System.out.println(key+" : "+this.counts.get(key));}}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

创建WordCountTopology

现在已经实现了一个 SentenceSpout, SplitSentenceBolt,WordCountBolt,ReportBolt。 我们需要将SentenceSpout作为数据源,将句子发射给SplitSentenceBolt, SplitSentenceBolt将接收到的句子按照空格进行分割成单词发射给WordCountBolt,WordCountBolt对单词进行计数,并将计数结果发送给ReportBolt,注意WordCountBolt的计数结果随着程序进行随时变化,因此ReportBolt中的计数结果也随时更新。在程序结束后,将ReportBolt记录的单词和计数打印出来。完成代码如下:

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;/*** Created by chenhong on 16/1/25.*/
public class WordCountTopology {
    private static final String SENTENCE_SPOUT_ID="sentence-spout";private static final String SPILL_BOLT_ID ="split-bolt";private static final String COUNT_BOLT_ID ="count-bolt";private static final String REPORT_BOLT_ID="report-bolt";private static final String TOPOLOGY_NAME="word-count-topology";public static void main(String[] args){SentenceSpout spout = new SentenceSpout();SplitSentenceBolt splitBolt = new SplitSentenceBolt();WordCountBolt countBolt = new WordCountBolt();ReportBolt reportBolt = new ReportBolt();TopologyBuilder builder = new TopologyBuilder();//注册一个sentence spout并且赋值给其唯一的IDbuilder.setSpout(SENTENCE_SPOUT_ID, spout);//注册一个splitsentencebolt ,这个bolt订阅sentencespout发射出来的数据流,shuffleGrouping方法告诉//storm要将类sentenceSpout发射的tuple随机均匀地分发给SplitSentenceBolt实例builder.setBolt(SPILL_BOLT_ID,splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);//fieldsGrouping()方法来保证所有 word字段值相同的tuple会被路由到同一个wordcountbolt实例中builder.setBolt(COUNT_BOLT_ID,countBolt).fieldsGrouping(SPILL_BOLT_ID, new Fields("word"));//globalGrouping方法将WordCountBolt发射的所有tuple路由到唯一的ReportBolt任务中builder.setBolt(REPORT_BOLT_ID,reportBolt).globalGrouping(COUNT_BOLT_ID);//config对象代表了对topology所有组件全局生效的配置参数集合,会分发给各个spout和bolt的open(),prepare()方法Config config = new Config();//LocalCluster类在本地开发环境来模拟一个完整的storm集群LocalCluster cluster = new LocalCluster();cluster.submitTopology(TOPOLOGY_NAME,config,builder.createTopology());Utils.waitForSeconds(5);cluster.killTopology(TOPOLOGY_NAME);cluster.shutdown();}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

运行结果

注意单词计数的结果与机器性能有关

a : 432
ate : 432
beverages : 432
cold : 432
cow : 432
dog : 864
don't : 864
fleas : 864
has : 432
hava : 432
homework : 432
i : 1296
like : 864
man : 432
my : 864
the : 432
think : 432