当前位置: 代码迷 >> 综合 >> storm ack机制//
  详细解决方案

storm ack机制//

热度:54   发布时间:2024-01-10 09:16:46.0
1、原理\\
Storm实现一组特殊的'acker’ task来track每一个spout tuple, 同时acker task的个数你可以根据tuple的数量来配置,所有被产生的tuple都会有一个随机的64bit的id用于被track。tuple之间通过emit时的anchor形成tuple tree, 并且每个tuple都知道产生它的spout tuple的id (通过不断的copy传递)\\
Acker需要为每个spout tuple存储spout tuple id, task id, ack val,这个ack val, 64 bit number, 用于表示整个tuple tree的状况, 产生方法是tuple tree中所有created和acked的tuple的id进行异或,当ack val值为0的时候, 即表示tuple tree被完成。\\

2、实现\\

public class AckMain {public static void main(String[] args) {TopologyBuilder tb=new TopologyBuilder();tb.setSpout("1", new MessageSpout(),1);tb.setBolt("2", new SplitBolt()).shuffleGrouping("1");tb.setBolt("3", new FileWriteBolt()).shuffleGrouping("2");Config con=new Config();con.setDebug(false);LocalCluster local=new LocalCluster();local.submitTopology("xx", con, tb.createTopology());}
}

public class MessageSpout implements IRichSpout{private int index=0;private String[]lines=new String []{"0,zero","1,one","2,two","3,three","4,four","5,five","6,six","7,seven","8,eight","9,nine"};private SpoutOutputCollector spoutOutputCollector;@Overridepublic void ack(Object msId) {System.out.println("ack...MessageId........"+msId);}@Overridepublic void fail(Object arg0) {System.out.println("fail....messageId...."+arg0);spoutOutputCollector.emit(new Values(lines[(Integer)arg0]), arg0);}@Overridepublic void nextTuple() {if(index < lines.length){String l=lines[index];spoutOutputCollector.emit(new Values(l), index);index++;}}@Overridepublic void open(Map arg0, TopologyContext arg1, SpoutOutputCollector spoutOutputCollector) {this.spoutOutputCollector=spoutOutputCollector;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("line"));}}


 
 
public class SplitBolt implements IRichBolt{OutputCollector outputCollector;@Overridepublic void execute(Tuple input) {String line=input.getString(0);String []lines=line.split(",");for(String word:lines){outputCollector.emit(input, new Values(word)); //把接收的sentence进行切分,emit需要实现anchor,即第一个参数必须是Tuple。}outputCollector.ack(input);}@Overridepublic void prepare(Map arg0, TopologyContext arg1, OutputCollector outputCollector) {this.outputCollector=outputCollector;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("word"));}
}
public class FileWriteBolt implements IRichBolt{OutputCollector outputCollector;private FileWriter fw;int count=0;@Overridepublic void execute(Tuple input) {String word=input.getString(0);if(count==5)outputCollector.fail(input);//当发送第五个单词时触发fail方法。else{try {fw.write(word);fw.write("....");fw.flush();} catch (IOException e) {e.printStackTrace();}outputCollector.ack(input);}count++;}@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.outputCollector=outputCollector;try {fw=new FileWriter("d://000.txt");} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer arg0) {}
}