今天忙到飞起(到现在还没完),写一篇超短的小技巧吧。
HyperLogLog是去重计数的利器,能够以很小的精确度误差作为trade-off大幅减少内存空间占用,在不要求100%准确的计数场景极为常用。关于它的数学原理,看官可参见之前写过的《再谈基数估计之HyperLogLog算法》,不再赘述了。
在用Flink做实时计算的过程中,也短不了做去重计数,比如统计UV。我们当然可以直接借助Redis的HyperLogLog实现,但是要在Flink job内直接整合HyperLogLog该怎么做呢?
先引入如下Maven依赖项:
<dependency><groupId>net.agkn</groupId><artifactId>hll</artifactId><version>1.6.0</version><scope>compile</scope>
</dependency>
下面的聚合函数即可实现从WindowedStream按天、分键统计PV和UV。
WindowedStream<AnalyticsAccessLogRecord, Tuple, TimeWindow> windowedStream = watermarkedStream.keyBy("siteId").window(TumblingEventTimeWindows.of(Time.days(1))).trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)));windowedStream.aggregate(new AggregateFunction<AnalyticsAccessLogRecord, Tuple2<Long, HLL>, Tuple2<Long, Long>>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, HLL> createAccumulator() {return new Tuple2<>(0L, new HLL(14, 6));}@Overridepublic Tuple2<Long, HLL> add(AnalyticsAccessLogRecord record, Tuple2<Long, HLL> acc) {acc.f0++;acc.f1.addRaw(record.getUserId());return acc;}@Overridepublic Tuple2<Long, Long> getResult(Tuple2<Long, HLL> acc) {return new Tuple2<>(acc.f0, acc.f1.cardinality());}@Overridepublic Tuple2<Long, HLL> merge(Tuple2<Long, HLL> acc1, Tuple2<Long, HLL> acc2) {acc1.f0 += acc2.f0;acc1.f1.union(acc2.f1);return acc1;}
});
上述开源HyperLogLog组件的主要方法简述如下:
HLL(int log2m, int regwidth)
创建一个HyperLogLog对象。log2m即总分桶数目以2为底的对数,regwidth则是真正用来做基数估计的比特的下标值宽度。根据Redis的思路,log2m=14,regwidth=6,即可以仅用最多12kB内存,以0.81%的误差计算接近264的基数。void addRaw(long rawValue)
向HyperLogLog中插入元素。如果插入的元素非数值型的,则需要hash过后(推荐用Murmur3等比较快的哈希算法)再插入。long cardinality()
返回该HyperLogLog中元素的基数。void union(HLL other)
将两个HyperLogLog结构合并为一个。
该HyperLogLog组件如同Redis一样实现了稀疏存储与密集存储两种方式,以进一步减少内存占用量。其源码不难理解,看官可以自行参看。
最后,如果一定追求100%准确,该怎么办呢?普通的位图法显然不合适,应该采用压缩位图,如笔者之前提到过的RoaringBitmap。
继续忙去了。民那好梦。