当前位置: 代码迷 >> 综合 >> Storm 实战入门
  详细解决方案

Storm 实战入门

热度:3   发布时间:2023-12-17 08:00:22.0

Storm 实战入门

 

互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速了各个环节的效率。正因为大家对信息实时响应、实时交互的需求,软件行业除了个人操作系统之外,数据库(更精确的说是关系型数据库)应该是软件行业发展最快、收益最为丰厚的产品了。记得十年前,很多银行别说实时转 账,连实时查询都做不到,但是数据库和高速网络改变了这个情况。

随着互联网的更进一步发展,从Portal信息浏览型到Search信息搜索型到SNS关系交互传递型,以及电子商务、互联网旅游生活产品等将生活 中的流通环节在线化。对效率的要求让大家对于实时性的要求进一步提升,而信息的交互和沟通正在从点对点往信息链甚至信息网的方向发展,这样必然带来数据在各个维度的交叉关联,数据爆炸已不可避免。因此流式处理加NoSQL产品应运而生,分别解决实时框架和数据大规模存储计算的问题。

早在7、8年前诸如UC伯克利、斯坦福等大学就开始了对流式数据处理的研究,但是由于更多的关注于金融行业的业务场景或者互联网流量监控的业务场景,以及当时互联网数据场景的限制,造成了研究多是基于对传统数据库处理的流式化,对流式框架本身的研究偏少。目前这样的研究逐渐没有了声音,工业界更多 的精力转向了实时数据库。

2010年Yahoo!对S4的开源,2011年twitter对Storm的开源,改变了这个情况。以前互联网的开发人员在做一个实时应用的时候,除了要关注应用逻辑计算处理本身,还要为了数据的实时流转、交互、分布大伤脑筋。但是现在情况却大为不同,以Storm为例,开发人员可以快速的搭建一套健壮、易用的实时流处理框架,配合SQL产品或者NoSQL产品或者MapReduce计算平台,就可以低成本的做出很多以前很难想象的实时产品:比 如一淘数据部的量子恒道品牌旗下的多个产品就是构建在实时流处理平台上的。

本教程是一本对storm的基础介绍手册,但是我们也希望它不仅仅是一本storm的使用手册,我们会在其中加入更多我们在实际数据生产过程的经验和应用的架构,最后的目的是帮助所有愿意使用实时流处理框架的技术同仁,同时也默默的改变这个世界。

1 Storm特点

Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式 RPC,ETL等等。Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快(在一个小集群中,每个结点每秒可以处理数以百万计的消息)。Storm的部署和运维都很便捷,而且更为重要的是可以使用任意编程语言来开发应用。

1.1编程模型简单

在大数据处理方面相信大家对hadoop已经耳熟能详,基于GoogleMap/Reduce来实现的Hadoop为开发者提供了map、reduce原语,使并行批处理程序变得非常地简单和优美。同样,Storm也为大数据的实时计算提供了一些简单优美的原语,这大大降低了开发并行实时处理的任务的复杂性,帮助你快速、高效的开发应用。

1.2可扩展

在Storm集群中真正运行topology的主要有三个实体:工作进程、线程和任务。Storm集群中的每台机器上都可以运行多个工作进程,每个工作进程又可创建多个线程,每个线程可以执行多个任务,任务是真正进行数据处理的实体,我们开发的spout、bolt就是作为一个或者多个任务的方式执行的。因此,计算任务在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展。

1.3高可靠性

Storm可以保证spout发出的每条消息都能被“完全处理”,这也是直接区别于其他实时系统的地方,如S4。请注意,spout发出的消息后续可能会触发产生成千上万条消息,可以形象的理解为一棵消息树,其中spout发出的消息为树根,Storm会跟踪这棵消息树的处理情况,只有当这棵消息树中的所有消息都被处理了,Storm才会认为spout发出的这个消息已经被“完全处理”。如果这棵消息树中的任何一个消息处理失败了,或者整棵消息树在限定的时间内没有“完全处理”,那么spout发出的消息就会重发。

考虑到尽可能减少对内存的消耗,Storm并不会跟踪消息树中的每个消息,而是采用了一些特殊的策略,它把消息树当作一个整体来跟踪,对消息树中所有消息的唯一id进行异或计算,通过是否为零来判定spout发出的消息是否被“完全处理”,这极大的节约了内存和简化了判定逻辑,后面会对这种机制进行详细介绍。这种模式,每发送一个消息,都会同步发送一个ack/fail,对于网络的带宽会有一定的消耗,如果对于可靠性要求不高,可通过使用不同的emit接口关闭该模式。上面所说的,Storm保证了每个消息至少被处理一次,但是对于有些计算场合,会严格要求每个消息只被处理一次,幸而Storm的0.7.0引入了事务性拓扑,解决了这个问题,后面会有详述。

 1.4高容错性

如果在消息处理过程中出了一些异常,Storm会重新安排这个出问题的处理单元。Storm保证一个处理单元永远运行(除非你显式杀掉这个处理单元)。当然,如果处理单元中存储了中间状态,那么当处理单元重新被Storm启动的时候,需要应用自己处理中间状态的恢复。

1.5支持多种编程语言

除了用java实现spout和bolt,你还可以使用任何你熟悉的编程语言来完成这项工作,这一切得益于Storm所谓的多语言协议。多语言协议是Storm内部的一种特殊协议,允许spout或者bolt使用标准输入和标准输出来进行消息传递,传递的消息为单行文本或者是json编码的多行。

Storm支持多语言编程主要是通过ShellBolt,ShellSpout和ShellProcess这些类来实现的,这些类都实现了IBolt 和 ISpout接口,以及让shell通过java的ProcessBuilder类来执行脚本或者程序的协议。

可以看到,采用这种方式,每个tuple在处理的时候都需要进行json的编解码,因此在吞吐量上会有较大影响。

1.6支持本地模式

Storm有一种“本地模式”,也就是在进程中模拟一个Storm集群的所有功能,以本地模式运行topology跟在集群上运行topology类似,这对于我们开发和测试来说非常有用。

1.7高效

用ZeroMQ作为底层消息队列, 保证消息能快速被处理


 

2 Storm基本概念

在运行一个Storm任务之前,需要了解一些概念:

1.       Topologies

2.       Streams

3.       Spouts

4.       Bolts

5.       Stream groupings

6.       Reliability

7.       Tasks

8.       Workers

9.       Configuration

Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是:一个MapReduce job最终会结束,而一个topology永远会运行(除非你手动kill掉)。

在Storm的集群里面有两种节点: 控制节点(masternode)和工作节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。

每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程组成。


图 2-1 topology工作进程

Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完 成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在zookeeper里面,要么在本地磁盘上。这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程,然后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。

2.1 Topologies

一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来,如下图:


图2-2 topology

一个topology会一直运行直到你手动kill掉,Storm自动重新分配执行失败的任务, 并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。

运行一个topology很简单。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:

storm jar all-my-code.jar backtype.storm.MyTopologyarg1 arg2

这个命令会运行主类:backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到Nimbus并且上传jar包。

Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务, 你可以提交由任何语言创建的topology。上面的方面是用JVM-based语言提交的最简单的方法。

2.2 Streams

消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple 序列,而这些tuple序列会以一种分布式的方式并行地创建和处理。通过对stream中tuple序列中每个字段命名来定义stream。在默认的情况下,tuple的字段类型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定义类型(只要实现相应的序列化器)。

每个消息流在定义的时候会被分配给一个id,因为单向消息流使用的相当普遍, OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会分配个值为‘default’默认的id 。

Storm提供的最基本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。

2.3 Spouts

消息源spout是Storm里面一个topology里面的消息生产者。一般来说消 息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果这个tuple没有被 storm成功处理,可靠的消息源spouts可以重新发射一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。

消息源可以发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。

Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。

另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。

2.4 Bolts

所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。

Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。

Bolts的主要方法是execute,它以一个tuple作为输入,bolts使用OutputCollector来发射tuple,bolts必须要为它处理的每一个tuple调用 OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。一般的流程是: bolts处理一个输入tuple,  发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

2.5 Stream groupings

定义一个topology的其中一步是定义每个bolt接收什么样的流作为输入。streamgrouping就是用来定义一个stream应该如果分配数据给bolts上面的多个tasks。

Storm里面有7种类型的streamgrouping:

1.       Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

2.       Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts里的一个task, 而不同的userid则会被分配到不同的bolts里的task。

3.       All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

4.       Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

5.       Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果,有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

6.       Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过 TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

7.       Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的ShuffleGrouping行为一致。

2.6 Reliability

Storm保证每个tuple会被topology完整的执行。Storm会追踪由每 个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而形成树状结构),并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置,如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功,那么topology会把这个tuple标记为执行失败,并且过一会儿重新发射这个tuple。

为了利用Storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个 tuple的时候你必须要通知storm。这一切是由OutputCollector来完成的。通过emit方法来通知一个新的tuple产生了,通过 ack方法通知一个tuple处理完成了。

Storm的可靠性我们在第四章会深入介绍。

2.7 Tasks

每一个spout和bolt会被当作很多task在整个集群里执行。每一个executor对应到一个线程,在这个线程上运行多个task,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder类的setSpout和 setBolt来设置并行度(也就是有多少个task)。

2.8 Workers

一个topology可能会在一个或者多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程来执行,那么每个工作进程会处理其中的6个tasks。Storm会尽量均匀的工作分配给所有的worker。

2.9 Configuration

Storm里面有一堆参数可以配置来调整Nimbus, Supervisor以及正在运行的topology的行为,一些配置是系统级别的,一些配置是topology级别的。default.yaml里面有所有的默认配置。你可以通过定义个storm.yaml在你的classpath里来覆盖这些默认配置。并且你也可以在代码里面设置一些topology 相关的配置信息(使用StormSubmitter)。

3构建Topology

3.1 实现的目标

我们将设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。这是一个简单的例子,目的是让大家对于topology快速上手,有一个初步的理解。

3.2 设计Topology结构

在开始开发Storm项目的第一步,就是要设计topology。确定好你的数据处理逻辑,我们今天将的这个简单的例子,topology也非常简单。整个topology如下:




图3-1 topology逻辑组成

 

整个topology分为三个部分:

KestrelSpout:数据源,负责发送sentence

Splitsentence:负责将sentence切分

Wordcount:负责对单词的频率进行累加

3.3 设计数据流

这个topology从kestrelqueue读取句子,并把句子划分成单词,然后汇总每个单词出现的次数,一个tuple负责读取句子,每一个tuple分别对应计算每一个单词出现的次数,大概样子如下所示:



图3-2 数据流

3.4代码实现

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout(“kestrel.backtype.com”,22133,

”sentence_queue”, new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

      .shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

      .fieldsGrouping(2, new Fields(“word”));

这种topology的spout从句子队列中读取句子,在kestrel.backtype.com位于一个Kestrel的服务器端口22133。

Spout用setSpout方法插入一个独特的id到topology。 Topology中的每个节点必须给予一个id,id是由其他bolts用于订阅该节点的输出流。 KestrelSpout在topology中id为1。

setBolt是用于在Topology中插入bolts。 在topology中定义的第个bolts 是切割句子的bolts。 这个bolts 将句子流转成成单词流。

让我们看看SplitSentence实施:

public classSplitSentence implements IBasicBolt{

      public void prepare(Map conf, TopologyContext context) {

        }

      public void execute(Tuple tuple, BasicOutputCollector collector) {

            String sentence = tuple.getString(0);

             for(String word: sentence.split(“ ”)) {

                      collector.emit(new Values(word));

                 }

            }

        public void cleanup() {

       }

      public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields(“word”));

            }

 }

关键的方法 execute方法。 正如你可以看到,它将句子拆分成单词,并发出每个单词作为一个新的元组。 另一个重要的方法是declareOutputFields,其中宣布bolts输出元组的架构。 在这里宣布,它发出一个域为word的元组

setBolt的最后一个参数是你想为bolts的并行量。 SplitSentence bolts 是10个并发,这将导致在storm集群中有十个线程并行执行。 你所要做的的是增加bolts的并行量在遇到topology的瓶颈时。

 setBolt方法返回一个对象,用来定义bolts的输入。 例如,SplitSentence螺栓订阅组件“1”使用随机分组的输出流。 “1”是指已经定义KestrelSpout。 我将解释在某一时刻的随机分组的一部分。 到目前为止,最要紧的是,SplitSentence bolts会消耗KestrelSpout发出的每一个元组。

下面在让我们看看wordcount的实现:

public classWordCount implements IBasicBolt {

       private Map<String, Integer> _counts = newHashMap<String, Integer>();

       public void prepare(Map conf, TopologyContext context) {

       }

      public void execute(Tuple tuple, BasicOutputCollector collector) {

             String word = tuple.getString(0);

             int count;

             if(_counts.containsKey(word)) {

                    count =_counts.get(word);

             } else {

                    count = 0;

}

             count++;

             _counts.put(word, count);

             collector.emit(new Values(word,count));

      }

      public void cleanup() {

      }

      public void declareOutputFields(OutputFieldsDeclarer declarer) {

             declarer.declare(new Fields(“word”,“count”));

      }

}

SplitSentence对于句子里面的每个单词发射一个新的tuple, WordCount在内存里面维护一个单词->次数的mapping, WordCount每收到一个单词,它就更新内存里面的统计状态。

3.5 运行Topology

storm的运行有两种模式: 本地模式和分布式模式.

1) 本地模式:

storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。你运行storm-starter里面的topology的时候它们就是以本地模式运行的,你可以看到topology里面的每一个组件在发射什么消息。

2) 分布式模式:

storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。master负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了, master节点会把认为重新分配到其它节点。

3) 下面是以本地模式运行的代码:

        Config conf = new Config();

        conf.setDebug(true);

        conf.setNumWorkers(2);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(“test”, conf,builder.createTopology());

         Utils.sleep(10000);

         cluster.killTopology(“test”);

         cluster.shutdown();

首先, 这个代码定义通过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用submitTopology方法来提交topology,它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。

topology的名字是用来唯一区别一个topology的,这样你然后可以用这个名字来杀死这个topology的。前面已经说过了,你必须显式的杀掉一个topology,否则它会一直运行。

Conf对象可以配置很多东西, 下面两个是最常见的:

 TOPOLOGY_WORKERS(setNumWorkers)定义你希望集群分配多少个工作进程给你来执行这个topology. topology里面的每个组件会被需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进程里面. 每一个工作进程包含一些节点的一些工作线程。比如,如果你指定300个线程,60个进程,那么每个工作进程里面要执行6个线程,而这6个线程可能属于不同的组件(Spout, Bolt)。你可以通过调整每个组件的并行度以及这些线程所在的进程数量来调整topology的性能。

 TOPOLOGY_DEBUG(setDebug),当它被设置成true的话, storm会记录下每个组件所发射的每条消息。这在本地环境调试topology很有用,但是在线上这么做的话会影响性能的。

4 storm的一些常见模式

1.       流聚合(stream join)

2.       批处理(Batching)

3.       BasicBolt

4.       内存内缓存 + fields grouping 组合

5.       计算top N

6.       用TimeCacheMap来高效地保存一个最近被更新的对象的缓存

7.       分布式RPC: CoordinatedBolt和KeyedFairBolt

4.1流聚合(stream join)

聚合把两个或者多个数据流聚合成一个数据流— 基于一些共同的tuple字段。流聚合和SQL里面table join很像,只是table join的输入是有限的,并且join的语义是非常明确的。而流聚合的语义是不明确的并且输入流是无限的。

流类型的聚合类型跟具体的应用是有关了。一些应用把两个流发出的所有的tuple都聚合起来 — 不管多长时间;而另外一些应用则只会聚合一些特定的tuple。而另外一些应用的聚合逻辑又可能完全不一样。而这些聚合类型里面最常见的类型是把所有的输入流进行一样的划分,这个在storm里面用fields grouping在相同字段上进行grouping就可以了,比如:

builder.setBolt(5,new MyJoiner(), parallelism)

    .fieldsGrouping(1,new Fields("field1", "field2"))

    .fieldsGrouping(2,new Fields("field1", "field2"))

    .fieldsGrouping(3,new Fields("field1", "field2"));

当然,不同的数据流的“相同”字段可以有不一样的名字。

4.2批处理(Batching)

有时候为了性能或者一些别的原因, 你可能想把一组tuple一起处理, 而不是一个个单独处理。比如,你可能想批量更新数据库。

如果你想让你的数据处理具有可靠性,正确的方式是保存这些tuple对象的引用直到bolt批量处理这些tuple了。一旦这个批量操作结束,你可以批量的ack这些tuple。

如果一个bolt发射tuple, 那么你可能想用multi-anchoring来保证可靠性。这一切都取决于具体的应用。关于storm的消息传递的工作原理可以看这篇: Twitter Storm如何保证消息不丢失

4.3 BasicBolt

很多bolt有些类似的模式:

读一个输入tuple

根据这个输入tuple发射一个或者多个tuple

在execute的方法的最后ack那个输入tuple

遵循这类模式的bolt一般是函数或者是过滤器, 这种模式太常见,storm为这类模式单独封装了一个接口: IBasicBolt。更多的信息请看: TwitterStorm如何保证消息不丢失

 4.4 内存内缓存 + fieldsgrouping 组合

在bolt的内存里面缓存一些东西非常常见。缓存在和fields grouping结合起来之后就更有用了。比如,你有一个bolt把短链接变成长链接(bit.ly, t.co之类的)。你可以把短链接到长链接的对应关系利用LRU算法缓存在内存里面以避免重复计算。比如组件一发射短链接,组件二把短链接转化成长链接并缓存在内存里面。看一下下面两段代码有什么不一样:

builder.setBolt(2,new ExpandUrl(), parallelism)

  .shuffleGrouping(1);

builder.setBolt(2,new ExpandUrl(), parallelism)

  .fieldsGrouping(1,new Fields("url"));

第二种方式的缓存会比第一种方式的缓存的效率高很多,因为同样的短链接始终被发到同一个task。这会避免不同的机器上有同样的缓存  — 浪费内存,同时也使得同样的短域名更可能在内存里面找到缓存。

4.5计算top N

storm的一个常见的持续计算的模式叫做: “streaming top N”。

比如你有一个bolt发射这样的tuple:["value", "count"]并且你想一个bolt基于这些信息算出top N的tuple。最简单的办法是有一个bolt可以做一个全局的grouping的动作并且在内存里面保持这topN的值。

这个方式对于大数据量的流显然是没有扩展性的, 因为所有的数据会被发到同一台机器, 单机的处理能力始终是有极限的。一个更好的方法是在多台机器上面并行的计算这个流每一部分的top N, 然后再有一个bolt合并这些机器上面所算出来的top N以算出最后的top N(Map Reduce的思想), 代码大概是这样的:

builder.setBolt(2,new RankObjects(), parallellism)

  .fieldsGrouping(1,new Fields("value"));

builder.setBolt(3,new MergeObjects())

  .globalGrouping(2);

这个模式之所以可行是因为第一个bolt的fieldsgrouping使得这种并行算法在语义上是正确的。

用TimeCacheMap来高效地保存一个最近被更新的对象的缓存

有时候你想在内存里面保存一些最近活跃的对象,以及让那些不再活跃的对象自动过期(删除掉)。TimeCacheMap是一个非常高效的数据结构,它提供了一些callback函数使得我们在对象不再活跃的时候做一些事情。关于TimeCacheMap为什么高效,可以看看这篇分析文章

4.6分布式RPC: CoordinatedBolt和KeyedFairBolt

用storm做分布式RPC应用的时候有两种比较常见的模式:它们被封装在 CoordinatedBoltKeyedFairBolt里面。

CoordinatedBolt包装你的bolt,并且确定什么时候你的bolt已经接收到所有的tuple。它主要使用DirectStream来做这个。

KeyedFairBolt同样包装你的bolt并且保证你的topology同时处理多个DRPC调用,而不是串行地一次只执行一个。

5 消息的可靠处理

5.1 简介

storm可以确保spout发送出来的每个消息都会被完整的处理。本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理。

5.2 理解消息被完整处理

一个消息(tuple)从spout发送出来,可能会导致成百上千的消息基于此消息被创建。

我们来思考一下流式的“单词统计”的例子:

storm任务从数据源(Kestrelqueue)每次读取一个完整的英文句子;将这个句子分解为独立的单词,最后,实时的输出每个单词以及它出现过的次数。

本例中,每个从spout发送出来的消息(每个英文句子)都会触发很多的消息被创建,那些从句子中分隔出来的单词就是被创建出来的新消息。

这些消息构成一个树状结构,我们称之为“tuple tree”,看起来如图1所示:


图 5-1 tuple tree


 

在什么条件下,Storm才会认为一个从spout发送出来的消息被完整处理呢?答案就是下面的条件同时被满足:

tuple tree不再生长

树中的任何消息被标识为“已处理”

如果在指定的时间内,一个消息衍生出来的tupletree未被完全处理成功,则认为此消息未被完整处理。这个超时值可以通过任务级参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS进行配置,默认超时值为30秒。

5.3消息的生命周期

如果消息被完整处理或者未被完整处理,Storm会如何进行接下来的操作呢?为了弄清这个问题,我们来研究一下从spout发出来的消息的生命周期。这里列出了spout应该实现的接口:



首先, Storm使用spout实例的nextTuple()方法从spout请求一个消息(tuple)。 收到请求以后,spout使用open方法中提供的SpoutOutputCollector向它的输出流发送一个或多个消息。每发送一个消 息,Spout会给这个消息提供一个messageID,它将会被用来标识这个消息。

假设我们从kestrel队列中读取消息,Spout会将kestrel 队列为这个消息设置的ID作为此消息的messageID。 向SpoutOutputCollector中发送消息格式如下:

接来下,这些消息会被发送到后续业务处理的bolts, 并且Storm会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tuple tree被完整处理后,Storm会调用Spout中的ack方法,并将此消息的messageID作为参数传入。同理,如果某消息处理超时,则此消息对应的Spout的fail方法会被调用,调用时此消息的messageID会被作为参数传入。

注意:一个消息只会由发送它的那个spout任务来调用ack或fail。如果系统中某个spout由多个任务运行,消息也只会由创建它的spout任务来应答(ack或fail),决不会由其他的spout任务来应答。

我们继续使用从kestrel队列中读取消息的例子来阐述高可靠性下spout需要做些什么(假设这个spout的名字是KestrelSpout)。

我们先简述一下kestrel消息队列:

当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并未从队列中真正的删除,而是将此消息设置为“pending”状态,它等待来自客户端的应答,被应答以后,此消息才会被真正的从队列中删除。处于“pending”状态的消息不会被其他的客户端看到。另外,如果一个客户端意外的断开连接,则由此客户端“打开”的所有消息都会被重新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个唯一的标识。

KestrelSpout就是使用这个唯一的标识作为这个tuple的messageID的。稍后当ack或fail被调用的时候,KestrelSpout会把ack或者fail连同messageID一起发送给kestrel队列,kestrel会将消息从队列中真正删除或者将它重新放回队列中。

5.4可靠相关的API

为了使用Storm提供的可靠处理特性,我们需要做两件事情:

无论何时在tuple tree中创建了一个新的节点,我们需要明确的通知Storm;

当处理完一个单独的消息时,我们需要告诉Storm这棵tuple tree的变化状态。

通过上面的两步,storm就可以检测到一个tupletree何时被完全处理了,并且会调用相关的ack或fail方法。Storm提供了简单明了的方法来完成上述两步。

为tuple tree中指定的节点增加一个新的节点,我们称之为锚定(anchoring)。锚定是在我们发送消息的同时进行的。为了更容易说明问题,我们使用下面代码作为例子。本示例的bolt将包含整句话的消息分解为一系列的子消息,每个子消息包含一个单词。



每个消息都通过这种方式被锚定:把输入消息作为emit方法的第一个参数。因为word消息被锚定在了输入消息上,这个输入消息是spout发送过来的 tuple tree的根节点,如果任意一个word消息处理失败,派生这个tuple tree那个spout 消息将会被重新发送。

与此相反,我们来看看使用下面的方式emit消息时,Storm会如何处理:


如果以这种方式发送消息,将会导致这个消息不会被锚定。如果此tuple tree中的消息处理失败,派生此tuple tree的根消息不会被重新发送。根据任务的容错级别,有时候很适合发送一个非锚定的消息。

一个输出消息可以被锚定在一个或者多个输入消息上,这在做join或聚合的时候是很有用的。一个被多重锚定的消息处理失败,会导致与之关联的多个spout消息被重新发送。多重锚定通过在emit方法中指定多个输入消息来实现:

多重锚定会将被锚定的消息加到多棵tupletree上。

注意:多重绑定可能会破坏传统的树形结构,从而构成一个DAGs(有向无环图),如图2所示:


图5-2 多重锚定构成的钻石型结构

Storm的实现可以像处理树那样来处理DAGs。

锚定表明了如何将一个消息加入到指定的tupletree中,高可靠处理API的接下来部分将向您描述当处理完tuple tree中一个单独的消息时我们该做些什么。这些是通过OutputCollector 的ack和fail方法来实现的。回头看一下例子SplitSentence,可以发现当所有的word消息被发送完成后,输入的表示句子的消息会被应答(acked)。

每个被处理的消息必须表明成功或失败(acked或者failed)。Storm是使用内存来跟踪每个消息的处理情况的,如果被处理的消息没有应答的话,迟早内存会被耗尽!>

很多bolt遵循特定的处理流程: 读取一个消息、发送它派生出来的子消息、在execute结尾处应答此消息。一般的过滤器(filter)或者是简单的处理功能都是这类的应用。 Storm有一个BasicBolt接口封装了上述的流程。示例SplitSentence可以使用BasicBolt来重写:


使用这种方式,代码比之前稍微简单了一些,但是实现的功能是一样的。发送到BasicOutputCollector的消息会被自动的锚定到输入消息,并且,当execute执行完毕的时候,会自动的应答输入消息。

很多情况下,一个消息需要延迟应答,例如聚合或者是join。只有根据一组输入消息得到一个结果之后,才会应答之前所有的输入消息。并且聚合和join大部分时候对输出消息都是多重锚定。然而,这些特性不是IBasicBolt所能处理的。

5.5高效的实现tuple tree

Storm 系统中有一组叫做“acker”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。每当发现一个DAG被完全处理,它就向创建这个根消息的spout任务发送一个信号。拓扑中acker任务的并行度可以通过配置参数Config.TOPOLOGY_ACKERS来设置。默认的acker任务并行度为1,当系统中有大量的消息时,应该适当提高acker任务的并发度。

为了理解Storm可靠性处理机制,我们从研究一个消息的生命周期和tuple tree的管理入手。当一个消息被创建的时候(无论是在spout还是bolt中),系统都为该消息分配一个64bit的随机值作为id。这些随机的id 是acker用来跟踪由spout消息派生出来的tuple tree的。

每个消息都知道它所在的tuple tree对应的根消息的id。每当bolt新生成一个消息,对应tupletree中的根消息的messageId就拷贝到这个消息中。当这个消息被应答的时候,它就把关于tuple tree变化的信息发送给跟踪这棵树的acker。例如,他会告诉acker:本消息已经处理完毕,但是我派生出了一些新的消息,帮忙跟踪一下吧。

举个例子,假设消息D和E是由消息C派生出来的,这里演示了消息C被应答时,tupletree是如何变化的。


因为在C被从树中移除的同时D和E会被加入到tupletree中,因此tuple tree不会被过早的认为已完全处理。

关于Storm如何跟踪tupletree,我们再深入的探讨一下。前面说过系统中可以有任意个数的acker,那么,每当一个消息被创建或应答的时候,它怎么知道应该通知哪个acker呢?

系统使用一种哈希算法来根据spout消息的messageId确定由哪个acker跟踪此消息派生出来的tupletree。因为每个消息都知道与之对应的根消息的messageId,因此它知道应该与哪个acker通信。

当spout发送一个消息的时候,它就通知对应的acker一个新的根消息产生了,这时acker就会创建一个新的tuple tree。当acker发现这棵树被完全处理之后,他就会通知对应的spout任务。

tuple是如何被跟踪的呢?系统中有成千上万的消息,如果为每个spout发送的消息都构建一棵树的话,很快内存就会耗尽。所以,必须采用不同的策略来跟踪每个消息。由于使用了新的跟踪算法,Storm只需要固定的内存(大约20字节)就可以跟踪一棵树。这个算法是 storm正确运行的核心,也是storm最大的突破。

acker任务保存了spout消息id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker 就知道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为“ack val”,它是树中所有消息的随机id的异或结果。ack val表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息id发送过来做异或。

每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被完全处理了。因为消息的随机ID是一个64bit的值,因此ack val在树处理完之前被置为0的概率非常小。假设你每秒钟发送一万个消息,从概率上说,至少需要50,000,000年才会有机会发生一次错误。即使如此,也只有在这个消息确实处理失败的情况下才会有数据的丢失!

5.6选择合适的可靠性级别

Acker任务是轻量级的,所以在拓扑中并不需要太多的acker存在。可以通过Storm UI来观察acker任务的吞吐量,如果看上去吞吐量不够的话,说明需要添加额外的acker。

如果你并不要求每个消息必须被处理(你允许在处理过程中丢失一些信息),那么可以关闭消息的可靠处理机制,从而可以获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每个消息不需要应答了)。另外,关闭消息的可靠处理可以减少消息的大小(不需要每个tuple记录它的根id了),从而节省带宽。

有三种方法可以关系消息的可靠处理机制:

1.       将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立刻被调用;

2.       第二个方法是Spout发送一个消息时,不指定此消息的messageID。当需要关闭特定消息可靠性的时候,可以使用此方法;

3.       最后,如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要做锚定,即在emit方法中不指定输入消息。因为这些子孙消息没有被锚定在任何tuple tree中,因此他们的失败不会引起任何spout重新发送消息。

5.7 集群的各级容错

到现在为止,大家已经理解了Storm的可靠性机制,并且知道了如何选择不同的可靠性级别来满足需求。接下来我们研究一下Storm如何保证在各种情况下确保数据不丢失。

5.7.1 任务级失败

1.       因为bolt任务crash引起的消息未被应答。此时,acker中所有与此bolt任务关联的消息都会因为超时而失败,对应spout的fail方法将被调用。

2.       acker任务失败。如果acker任务本身失败了,它在失败之前持有的所有消息都将会因为超时而失败。Spout的fail方法将被调用。

3.       Spout任务失败。这种情况下,Spout任务对接的外部设备(如MQ)负责消息的完整性。例如当客户端异常的情况下,kestrel队列会将处于pending状态的所有的消息重新放回到队列中。

5.7.2 任务槽(slot) 故障

1.       worker失败。每个worker中包含数个bolt(或spout)任务。supervisor负责监控这些任务,当worker失败后,supervisor会尝试在本机重启它。

2.       supervisor失败。supervisor是无状态的,因此supervisor的失败不会影响当前正在运行的任务,只要及时的将它重新启动即可。supervisor不是自举的,需要外部监控来及时重启。

3.       nimbus失败。nimbus是无状态的,因此nimbus的失败不会影响当前正在运行的任务(nimbus失败时,无法提交新的任务),只要及时的将它重新启动即可。nimbus不是自举的,需要外部监控来及时重启。

5.7.3 集群节点(机器)故障

1.       storm集群中的节点故障。此时nimbus会将此机器上所有正在运行的任务转移到其他可用的机器上运行。

2.       zookeeper集群中的节点故障。zookeeper保证少于半数的机器宕机仍可正常运行,及时修复故障机器即可。

5.8 小结

storm集群如何实现数据的可靠处理。借助于创新性的tuple tree跟踪技术,storm高效的通过数据的应答机制来保证数据不丢失。

storm集群中除nimbus外,没有单点存在,任何节点都可以出故障而保证数据不会丢失。nimbus被设计为无状态的,只要可以及时重启,就不会影响正在运行的任务。

6 Understanding the Parallelism of a Storm Topology

In the past fewdays I have been test-driving Twitter’s Stormproject, which is a distributed real-time data processing platform. One of myfindings so far has been that the quality of Storm’s documentation and examplecode is pretty good – it is very easy to get up and running with Storm. Bigprops to the Storm developers! At the same time, I found the sections on how aStorm topology runs in a cluster not perfectly clear, and learned that therecent releases of Storm changed some of its behavior in a way that is not yetfully reflected in the Storm wiki and in the API docs.

In this articleI want to share my own understanding of the parallelism of a Storm topologyafter reading the documentation and writing some first prototype code. Morespecifically, I describe the relationships of worker processes, executors (threads)and tasks, and how you can configure them according to your needs. This articleis based on Storm release 0.8.1, the latest version as of October 2012.

6.1 What is Storm?

For thosereaders unfamiliar with Storm here is a briefdescription taken from its homepage:

Storm is a freeand open source distributed realtime computation system. Storm makes it easy toreliably process unbounded streams of data, doing for realtime processing whatHadoop did for batch processing. Storm is simple, can be used with anyprogramming language, and is a lot of fun to use!

Storm has manyuse cases: realtime analytics, online machine learning, continuous computation,distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over amillion tuples processed per second per node. It is scalable, fault-tolerant,guarantees your data will be processed, and is easy to set up and operate.

6.2 What makes a running topology: worker processes,executors and tasks

Storm distinguishesbetween the following three main entities that are used to actually run atopology in a Storm cluster:

1.       Worker processes

2.       Executors (threads)

3.       Tasks

Here is a simpleillustration of their relationships:


Figure 1: Therelationships of worker processes, executors (threads) and tasks in Storm

A workerprocess executes a subset of a topology, and runs in its own JVM. A workerprocess belongs to a specific topology and may run one or more executors forone or more components (spouts or bolts) of this topology. A running topologyconsists of many such processes running on many machines within a Stormcluster.

An executoris a thread that is spawned by a worker process and runs within the worker’sJVM. An executor may run one or more tasks for the same component (spout orbolt). An executor always has one thread that it uses for all of its tasks,which means that tasks run serially on an executor.

A taskperforms the actual data processing and is run within its parent executor’sthread of execution. Each spout or bolt that you implement in your codeexecutes as many tasks across the cluster. The number of tasks for a componentis always the same throughout the lifetime of a topology, but the number ofexecutors (threads) for a component can change over time. This means that thefollowing condition holds true: #threads <= #tasks. By default, the numberof tasks is set to be the same as the number of executors, i.e. Storm will runone task per thread (which is usually what you want anyways).

Also be awarethat:

1.       The number of executor threadscan be changed after the topology has been started (seestorm rebalance command below).

2.       The number of tasks of atopology is static.

See Understandingthe Internal Message Buffers of Storm for another view on the variousthreads that are running within the lifetime of a worker process and itsassociated executors and tasks.

6.3 Configuring the parallelism of a topology

Note that inStorm’s terminology “parallelism” is specifically used to describe theso-calledparallelism hint, which means theinitial number of executors (threads) of a component. In this article though Iuse the term “parallelism” in a more general sense to describe how you canconfigure not only the number of executors but also the number of workerprocesses and the number of tasks of a Storm topology. I will specifically callout when “parallelism” is used in the narrow definition of Storm.

The followingtable gives an overview of the various configuration options and how to setthem in your code. There is more than one way of setting these options though,and the table lists only some of them. Storm currently has the followingorderof precedence for configuration settings: external component-specificconfiguration > internal component-specific configuration >topology-specific configuration >storm.yaml >defaults.yaml. Please take a look at the Storm documentation for more details.

 

Here is an example code snippet to show these settingsin practice:


In the abovecode we configured Storm to run the bolt GreenBolt with an initial number of two executors and four associated tasks.Storm will run two tasks per executor (thread). If you do not explicitlyconfigure the number of tasks, Storm will run by default one task per executor.

6.4 Configuring parallelism on multi-tenant Stormclusters

Storm 0.8.2 introducedthe Isolation Scheduler that makes it easy and safe to share a clusteramong many topologies, i.e. it solves the multi-tenancy problem — avoidingresource contention between topologies — by providing full isolation betweentopologies.

When you use theisolation scheduler Nathan recommends you set num workers to a multiple ofnumber of machines. And parallelism hint to a multiple of the number ofworkers. If you do call setNumTasks() (which most people don’t), you should setthat to a multiple of the parallelism hint. If you do this, then what happensis your workload is uniform distributed. Each machine and jvm process will havethe same number of threads, and roughly the same amount of work.

Jason Jackson on storm-user grokbase.com/t/gg/storm-user/…

6.5 Example of a running topology

The followingillustration shows how a simple topology would look like in operation. Thetopology consists of three components: one spout calledBlueSpout and two bolts calledGreenBolt andYellowBolt. The components are linkedsuch thatBlueSpout sends its output toGreenBolt, which in turns sends its own output toYellowBolt.



Figure 2:Example of a running topology

The GreenBoltwas configured as per the code snippet above whereas BlueSpout and YellowBoltonly set the parallelism hint (number of executors). Here is the relevant code:


And of courseStorm comes with additional configuration settings to control the parallelismof a topology, including:

TOPOLOGY_MAX_TASK_PARALLELISM:This setting puts a ceiling on the number of executors that can be spawned fora single component. It is typically used during testing to limit the number ofthreads spawned when running a topology in local mode. You can set this optionvia e.g.Config#setMaxTaskParallelism().

Update Oct 18: Nathan Marz informed me that TOPOLOGY_OPTIMIZE will be removed in a future release. I havetherefore removed its entry from the configuration list above.

7流式计算之Storm_wordcount实例

完成功能:统计指定分钟(注释:2、3、5…)窗口之内的Obj流量(注释:出现次数)排名(实际场景:统计最近2分钟之类IP的点击量的排名)

7.1 Topology入口

1 本地模式(嵌入Local):
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.starter.bolt.MergeObjects;
import storm.starter.bolt.RankObjects;
import storm.starter.bolt.RollingCountObjects;
 
public class RollingTopWords {
    public static void main(String[] args) throws Exception {
        final int TOP_N = 3;
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(1, new TestWordSpout(), 5);
        builder.setBolt(2, new RollingCountObjects(60, 10),4)
                .fieldsGrouping(1, new Fields("word"));
        builder.setBolt(3, new RankObjects(TOP_N), 4)
                .fieldsGrouping(2, new Fields("obj"));
        builder.setBolt(4, new MergeObjects(TOP_N))
                .globalGrouping(3);
        Config conf = new Config();
        conf.setDebug(true);


        LocalCluster cluster = new LocalCluster(); // 本地模式启动集群
        cluster.submitTopology("rolling-demo",conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }
}

2 集群模式

packagestorm.starter;
import storm.starter.bolt.MergeObjects;
import storm.starter.bolt.RankObjects;
import storm.starter.bolt.RollingCountObjects;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class RollingTopWords {
    public static void main(String[] args) throws Exception {
        final int TOP_N = 3;
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(1, new TestWordSpout(), 5);
        builder.setBolt(2, new RollingCountObjects(60, 10),4).fieldsGrouping(
                1, newFields("word"));
        builder.setBolt(3, new RankObjects(TOP_N),4).fieldsGrouping(2,
                newFields("obj"));
        builder.setBolt(4, newMergeObjects(TOP_N)).globalGrouping(3);
 
        Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(20);
        conf.setMaxSpoutPending(5000);


        StormSubmitter.submitTopology("demo",conf,
               builder.createTopology());
        Thread.sleep(10000);
    }
}

7.2 TestWordSpout(随机产生一个word)

package backtype.storm.testing;
 
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Random;
import org.apache.log4j.Logger;
 
 
public class TestWordSpout implements IRichSpout {
    public static Logger LOG = Logger.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;
 
    public TestWordSpout() {
        this(true);
    }
 
    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
 
    public boolean isDistributed() {
        return _isDistributed;
    }
 
  public void open(Map conf, TopologyContext context, SpoutOutputCollector

                                                      collector) {
        _collector = collector;
    }
 
    public void close() {
 
    }
 
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[]{"nathan", "mike", "jackson", "golda","bertels"};
        final Random rand = new Random();
        final String word =words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
 
    public void ack(Object msgId) {
 
    }
 
    public void fail(Object msgId) {
 
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

7.3 RollingCountObjects(滚动计数word,并通过定时触发时间,清空计数列表)

package storm.starter.bolt;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
 
@SuppressWarnings("serial")
public class RollingCountObjects implements IRichBolt {
 //计数用的项
   

private HashMap<Object, long[]> _objectCounts =

new HashMap<Object, long[]>();

//计数桶的个数(用于Obj的循环计数)
    private int _numBuckets;

//清除计数桶的临时进程
    private transient Thread cleaner;
    private OutputCollector _collector;

//时间窗口(单位分钟)
    private int _trackMinutes;


   //构造函数
    public RollingCountObjects(int numBuckets, int trackMinutes) {
        _numBuckets = numBuckets;
        _trackMinutes = trackMinutes;
    }


       //返回Obj总数
    public long totalObjects (Object obj) {
        long[] curr = _objectCounts.get(obj);
        long total = 0;
        for (long l: curr) {
            total+=l;
        }
        return total;
    }


   //获取当前计算桶的下标
    public int currentBucket (int buckets) {
        return (currentSecond()  /secondsPerBucket(buckets)) % buckets;
    }


    public int currentSecond() {
        return (int) (System.currentTimeMillis() / 1000);
    }
  //每个桶的计数时间(单位秒)
    public int secondsPerBucket(int buckets) {
        return (_trackMinutes * 60 / buckets);
    }


   //每个桶的计数时间(单位毫秒)
    public long millisPerBucket(int buckets) {
        return (long) secondsPerBucket(buckets) * 1000;
    }
 
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) {
        _collector = collector;
        cleaner = new Thread(new Runnable() {
           @SuppressWarnings("unchecked")
            public void run() {
                Integer lastBucket =currentBucket(_numBuckets);
                while(true) {

                         //当前计数桶
                  int currBucket =currentBucket(_numBuckets);
                 if(currBucket!=lastBucket) {

 //下个计数桶
                     int bucketToWipe = (currBucket + 1) % _numBuckets;
                     synchronized(_objectCounts) {
                         Set objs = new HashSet(_objectCounts.keySet());
                         for (Object obj: objs) {
                           long[] counts = _objectCounts.get(obj);
                           long currBucketVal = counts[bucketToWipe];

 //将下个计数桶清空,为滚动计数准备
                           counts[bucketToWipe] = 0; //  *这行代码很关键*
                           long total = totalObjects(obj);
                           if(currBucketVal!=0) {
                               _collector.emit(new Values(obj, total));
                           }
                           if(total==0) {
                               _objectCounts.remove(obj);
                           }
                         }
                      }
                     lastBucket = currBucket;
                  }

//每个桶的计数时间-0~每个桶的计数时间)
                  long delta =millisPerBucket(_numBuckets) –  (System.currentTimeMillis()% millisPerBucket(_numBuckets));
                 Utils.sleep(delta);
                }
            }
        });
        cleaner.start();
    }
 
    public void execute(Tuple tuple) {
 
        Object obj = tuple.getValue(0);
        int bucket = currentBucket(_numBuckets);
        synchronized(_objectCounts) {
            long[] curr = _objectCounts.get(obj);

//obj存在则为其分配空间和计数桶
            if(curr==null) {
                curr = newlong[_numBuckets];
                _objectCounts.put(obj,curr);
            }

//obj的出现次数计数加1
            curr[bucket]++;
            _collector.emit(new Values(obj,totalObjects(obj)));
            _collector.ack(tuple); //标记元组已经处理
        }
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("obj","count"));
    }
 
}

7.4 RankObjects(按top_n将Obj进行分类排序)

package storm.starter.bolt;
 
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;
 
 
@SuppressWarnings("serial")
public class RankObjects implements IBasicBolt {
 
    @SuppressWarnings("rawtypes")
    List<List> _rankings = new ArrayList<List>();
 
    int _count;
    Long _lastTime = null;


   //初始化TOPN(由topology入口指定)
    public RankObjects(int n) {
        _count = n;
    }
 
   //比较函数
    @SuppressWarnings("rawtypes")
    private int _compare(List one, List two) {
 
        long valueOne = (Long) one.get(1);
        long valueTwo = (Long) two.get(1);
 
        long delta = valueTwo - valueOne;
        if(delta > 0) {
            return 1;
        } else if (delta < 0) {
            return -1;
        } else {
            return 0;
        }
 
    } //end compare


   //返回Obj所在下标
    private Integer _find(Object tag) {
        for(int i = 0; i < _rankings.size(); ++i) {
 
            Object cur = _rankings.get(i).get(0);
            if (cur.equals(tag)) {
                return i;
            }
 
        }
 
        return null;
 
    }
 
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context) {
 
    }
 
    @SuppressWarnings("rawtypes")
    public void execute(Tuple tuple, BasicOutputCollector collector){
        Object tag = tuple.getValue(0);

        Integer existingIndex = _find(tag);
        if (null != existingIndex) {

//计数窗口移动直接覆盖
            _rankings.set(existingIndex,tuple.getValues());
        } else {

//不存在则添加并自动赋下标
            _rankings.add(tuple.getValues());
        }
 
      //排序函数
        Collections.sort(_rankings, newComparator<List>() {
            public int compare(List o1, List o2){
                return _compare(o1,o2);
            }
        });
 
      //TOPN排名,多余则移除
        if (_rankings.size() > _count) {
            _rankings.remove(_count);
        }
 
        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime +2000) {

//List<<a,total1><b,total2>,<c,total3>>格式化为JSON串类似序列化与反序列化
            collector.emit(newValues(JSONValue.toJSONString(_rankings)));
            _lastTime = currentTime;
        }
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }
 
}

7.5 MergeObjects(对排序结果进行归并)

package storm.starter.bolt;
 
import org.apache.log4j.Logger;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;
 
 
@SuppressWarnings("serial")
public class MergeObjects implements IBasicBolt {
    public static Logger LOG = Logger.getLogger(MergeObjects.class);
 
    @SuppressWarnings({ "rawtypes", "unchecked"})
    private List<List> _rankings = new ArrayList();
    int _count = 10;
    Long _lastTime;
 
    public MergeObjects(int n) {
        _count = n;
    }
 
 
    @SuppressWarnings("rawtypes")
    private int _compare(List one, List two) {
        long valueOne = (Long) one.get(1);
        long valueTwo = (Long) two.get(1);
 
        long delta = valueTwo - valueOne;
        if(delta > 0) {
            return 1;
        } else if (delta < 0) {
            return -1;
        } else {
            return 0;
        }
 
    } //end compare
 
    private Integer _find(Object tag) {
        for(int i = 0; i < _rankings.size(); ++i) {
            Object cur = _rankings.get(i).get(0);
            if (cur.equals(tag)) {
                return i;
            }
        }
        return null;
    }
 
 
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context) {
    }
 
    @SuppressWarnings({ "unchecked", "rawtypes"})
    public void execute(Tuple tuple, BasicOutputCollector collector){
       //JSON串转换为List<<a,total1><b,total2>,<c,total3>>格式
        List<List> merging = (List)JSONValue.parse(tuple.getString(0));
        for(List pair : merging) {
            Integer existingIndex =_find(pair.get(0));
            if (null != existingIndex) {

//存在则覆盖
               _rankings.set(existingIndex, pair);
            } else {
                _rankings.add(pair);
            }
 
            Collections.sort(_rankings, newComparator<List>() {
                public int compare(Listo1, List o2) {
                    return_compare(o1, o2);
                }
            });
            if (_rankings.size() > _count) {

//排序后截取TOPN的排名
               _rankings.subList(_count, _rankings.size()).clear();
            }

        }
 
        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime +2000) {

//继续格式化,提供给下游使用(下游可以选择持久化)
            String fullRankings = JSONValue.toJSONString(_rankings);
            collector.emit(newValues(fullRankings));
            LOG.info("Rankings: " +fullRankings);
            _lastTime = currentTime;
        }
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    } 

7.6 总结

窗口的说明,可以这么去想象。当你需要测量5分钟内管道流出的东西量,管道里面有泥沙、水、油(很奇怪的组合,但只是为了说明事情举例),你需要通过一个 group来按照东西的性质区分,分到不同的管子去测量。而由于需要测量5分钟以内的,为了避免一个桶盛不下这段时间内的总量,我们分成10只桶去计算,每5分钟/10=30秒钟我们换一只桶去接,同时清空这批最早盛东西的桶。那么任意时刻这10个桶中的总量,则是代表了当前最近过去5分钟内所盛的量,最小偏差为桶记录的时间间隔。

由于在窗口计数中,每隔一段时间(统计时长)输出当前的统计结果一次,这时候需要外接一个处理用来对输出结果进行排序,并把排序的结果再按一定的间隔输出。RankActionObjects 即是做这个事情的bolt。内置一个内存对象,按照计数条件记录来保存最新一份数据,再定期的输出内存对象中所有数据的排序后结果。当有新的计数条件记录进来,需要更新内存对象中该条件的记录。这样我们就能得到一定时间间隔的,最新的统计情况。

当结果输出,则内存对象需要进行清零操作,以准备接收下一个对象。需要注意的是这里由于涉及到全排序,需要进行globalGrouping处理,以保证进行了全局排序

这种模式适合用于计算求topN一类的统计需求。详细代码可以参考storm示例中的RollingTopWords来进行理解。