Hive 中的 GroupBy, Distinct 和 Join
- GroupBy
- 几种 Mode
- 原理
- 相关参数
- Distinct
- Single Distinct
- Multi Distinct
- Join
- Common Join
- Map Join——Hive MapJoin 优化历程、FaceBook Join优化
- Skew Join——Skewed Join Optimization
- Bucket Join
本文将介绍 Hive GroupBy,Distinct 和 Join 的原理与相关调优参数,帮助大家深入理解Hive。
GroupBy
几种 Mode
根据调用UDAF的不同接口,Hive GroupBy 算子分为以下几类Mode,具体请看代码注释 :)
/*** Group-by Mode: COMPLETE: complete 1-phase aggregation: iterate, terminate* PARTIAL1: partial aggregation - first phase: iterate, terminatePartial* PARTIAL2: partial aggregation - second phase: merge, terminatePartial* PARTIALS: For non-distinct the same as PARTIAL2, for distinct the same as* PARTIAL1* FINAL: partial aggregation - final phase: merge, terminate* HASH: For non-distinct the same as PARTIAL1 but use hash-table-based aggregation* MERGEPARTIAL: FINAL for non-distinct aggregations, COMPLETE for distinct* aggregations.*/
原理
- Map端聚合: Map端进行预聚合,减少shuffle数据量,类似于MR中的Combiner。默认情况下,Hive 会尽可能地使用 Map 端Aggregation,但是如果 Hash Map不能有效地降低内存使用,那么会降级到普通的Aggregation,即 Map 端仅做Shuffle Write,Reducer执行真正的聚合运算。具体可参考:Hive执行过程中最后是否有map-side aggregation。
- 倾斜:生成的查询计划有两个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。这样处理的结果是,相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。(PS: 目前Hive实现中有数据质量问题,请慎用!)
相关参数
# 是否开启mapper端聚合
hive.map.aggr# 是否开启,如果数据倾斜,是否优化group by为两个MR job
#该配置会触发hive增加额外的mr过程,随机化key后进行聚合操作得到中间结果,再对中间结果执行最终的聚合操作。
#count(distinct)操作比较特殊,无法进行中间的聚合操作,因此该参数对有count(distinct)操作的sql不适用。
hive.groupby.skewindata
# 用于map端聚合的hashtable最大可用内存,如果超过该内存比例,将flush到磁盘
hive.map.aggr.hash.force.flush.memory.threshold# 可以用于mapper端hatable的内存比例
hive.map.aggr.hash.percentmemory (Default: 0.5) – Percent of total map task memory that can be used for hash table.
# 如果hashtable大小/输入行数 大于该阈值,那么停止hash聚合,转为sort-based aggregation
hive.map.aggr.hash.min.reduction (Default: 0.5)
# 每隔多少行,检测hashtable大小和input row比例是否超过阈值
hive.groupby.mapaggr.checkinterval # 是否开启bucket group by
hive.optimize.groupby
Distinct
Single Distinct
当 query 只有一个distinct expression时,那么将 distinct expression作为一个partition key做shuffle,然后利用 MapReduce / Tez 的排序,在 reducer 端取最后一个key的即可完成去重功能。
Multi Distinct
如果查询中有多个 distinct expression,同一条record,会生成多条记录进行Shuffle,增大Shuffle量。考虑以下query:
select dealid, count(distinct uid), count(distinct date) from order group by dealid;
不考虑 Map Aggregation的情况下,上述 query 实际执行计划如下图所示。 Reducer端接收到的 key 中元素分别是:dealid
, distinct expression的序号
, distinct expression
。
https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html
具体代码可参考ReduceSinkOperator.process方法,代码片段如下图所示。
Join
Common Join
- 原理:
Map阶段:读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;Map输出的value为join之后所关心的(select或者where中需要用到的)列,同时在value中还会包含表的Tag信息,用于标明此value对应哪个表。
Shuffle阶段:根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中。
Reduce阶段:根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
- 适用场景:适用于所有类型的表关联与其他类型join不支持的join类型,比如:full outer join.
Map Join——Hive MapJoin 优化历程、FaceBook Join优化
- 原理:如果关联的表足够小,那么可以将小表加载到mapper的内存中,在map端完成join,减少shuffle和reduce阶段。MapReduce Local Task会在真正的MapReduce Join Task之前,从HDFS读取小表,然后将其转成一个tar文件,最后将文件上传至HDFS Cache.MapReduce Local Task运行过程中,可能由于内存不足而失败,可以通过设置
hive.mapjoin.localtask.max.memory.usage
来改变Local Task可使用的内存大小。
- Conditional Task: 当如果N-1张小表大小和小于“hive.mapjoin.smalltable.filesize”这个值,则创建Conditional Task。Conditional Tasks把每张表都是小表的情况都考虑进去了,然后加上一个所有表都不是小表的Common Join Task。 如:
SELECT * FROM cities JOIN sales on cities.cityId=sales.cityId;
则Conditional Tasks结构如下。在作业执行过程中,Hive将获取到所有关联表的元数据,如:大小,位置等,然后选择一个从众多conditional tasks中选择一个task作为真正执行的task。
- BuckUp Task:在MapReduce Local Task执行过程中,如果由于内存不足,导致任务执行失败,此时会直接执行Common Join Task。(PS:生产环境上能正常走到BackUp Task的case比较少,一般会由于客户端OOM,任务直接退出了。)
- 相关参数:
# 是否自动转换common join为map joinset hive.auto.convert.join=true;# 如果join的小表和小于该阈值,会尝试将Common join 转换成map join。通过explain命令,可以发现Operator树中有conditional Operator。 如果n-1张表大小和,小于该阈值,则生成conditional tasks。hive.smalltable.filesize or hive.mapjoin.smalltable.filesize# 如果join的小表小于该阈值,会直接将Common join转换成Map join。需要考虑到数据解压之后的实际大小,hive表在被解压后,文件大小可能会增大10倍。hive.auto.convert.join.noconditionaltask.size
- 适用场景:小表(维度表)join大表(事实表),不适用与Right/Full outer join.
- 可以优化的点:
- 根据作业中mapper的数量,来动态调整小表HashTable文件在HDFS上的副本数量。如果mapper有成千万个,可以适当提高cache的副本数量。
- 通过提前采样小表中的值,动态调整小表大小阈值。如果采样发现小表中重复row较多,可以适度提高小表的上限。
Skew Join——Skewed Join Optimization
- 原理:
- 假设A、B两张表相互Join,A表在key=1上倾斜。
- 若key为1,使用B的key=1的哈希表来计算结果,全部在mapper端完成;
- A表其他key发送到reducer端来join,这个reduce也会从B的mapper中得到对应需要连接的数据;
- 整个过程中,需要读两次B表,并且需要将两类结果UNION起来。
- 相关参数:
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=100000;
- 适用场景:
- 其中一张表有大量数据集中在某几个Key上
- 非倾斜表B,关于A的倾斜key的所有数据,需要能全部加载至内存中
- 需要提前预知那些哪些Key是倾斜的
Bucket Join
- 原理:Join全部在Mapper端进行,只有相互匹配的bucket才会聚集在一起,mapper1只会拉取a,c表的bucket1;Mapper3只会拉取a,c表的bucket2。整个任务在mapper端完成,去除了Shuffle和reduce阶段。
- 相关参数:
set hive.optimize.bucketmapjoin=true;
- 适用场景分析:
- 关联表的分桶数量成倍数
- 表的分桶key,需要与join字段相同
参考资料:
1) Hive SQL的编译过程: https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html
2)MapJoinOptimization: https://cwiki.apache.org/confluence/display/Hive/MapJoinOptimization
3)FaceBook Join Optimization in Apache Hive: https://engineering.fb.com/core-data/join-optimization-in-apache-hive/
4) Skewed Join Optimization: https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization