当前位置: 代码迷 >> 综合 >> spark sql优化:小表大表关联优化 union替换or broadcast join
  详细解决方案

spark sql优化:小表大表关联优化 union替换or broadcast join

热度:93   发布时间:2023-09-14 14:32:00.0

----原语句(运行18min)

SELECTbb.ipFROM(SELECTip ,sum(click) click_num,round(sum(click) / sum(imp), 4) user_click_rateFROMschema.srctable1WHEREdate = '20171020'AND ip IS NOT NULLAND imp > 0GROUP BY ip) bbLEFT OUTER JOIN(SELECTround(sum(click) / sum(imp), 4) avg_click_rateFROMschema.srctable1WHEREdate = '20171020') aaLEFT OUTER JOIN schema.dstable ccon cc.ip = bb.ipWHERE  cc.ip is nullAND(bb.user_click_rate > aa.avg_click_rate * 3AND click_num      > 500)OR(click_num > 1000)

分析:

1、aa表存放的就是一个指标数据,1条记录,列为小表
2、bb表存放的是按ip聚合的明细数据,记录很多,列为大表
3、cc表用来过滤ip,数量也很小,列为过滤表,作用很小。
查看执行计划,发现bb与aa进行left outer join时,引发了shuffle过程,造成大量的磁盘及网络IO,影响性能。

解决策略

优化方案1:调整大小表位置,将小表放在左边后,提升至29s (该方案一直不太明白为啥会提升,执行计划里显示的也就是大小表位置调换下而已,跟之前的没其他区别)
优化方案2: 将 or 改成 union,提升至35s(各种调整,一直怀疑跟or有关系,后面调整成union其他不变,果真效率不一样;但方案1只是调整了下大小表顺序,并未调整其他,其效率同样提升很大;不太明白sparksql内部到底走了什么优化机制,后面继续研究);

优化方案3: 采用cache+broadcast方式,提升至20s(该方案将小表缓存至内存,进行map侧关联)

方案具体实施

----方案2:or 改成 union(运行35s)

select aa.ipfrom (SELECT bb.ip ipFROM(SELECTip                  ,sum(click) click_num,round(sum(click) / sum(imp), 4)user_click_rateFROMschema.srctable1WHEREdate    = '20171020'AND ip IS NOT NULLAND imp > 0GROUP BY  ip) bbLEFT OUTER JOIN(SELECT round(sum(click) / sum(imp), 4) avg_click_rateFROM schema.srctable1WHERE date = '20171020')  aaWHERE  ( bb.user_click_rate > aa.avg_click_rate * 3AND click_num > 20 )union SELECTbb.ip ipFROM(SELECTip  , sum(click) click_num,round(sum(click) / sum(imp), 4)  user_click_rateFROM schema.srctable1WHEREdate    = '20171020'AND ip IS NOT NULLAND imp > 0GROUP BY  ip)  bbLEFT OUTER JOIN(SELECTround(sum(click) / sum(imp), 4) avg_click_rateFROM schema.srctable1WHERE  date = '20171020')  aaWHERE click_num > 40) aaLEFT OUTER JOIN schema.dstable ccon  aa.ip = cc.ipwhere cc.ip is null

-----cache+broadcast方式(20s)
原理:使用broadcast将会把小表分发到每台执行节点上,因此,关联操作都在本地完成,基本就取消了shuffle的过程,运行效率大幅度提高。

cache table ctaasSELECT  round(sum(click) / sum(imp), 4) avg_click_rateFROM schema.srctable1WHERE date = '20171020';INSERT into TABLE schema.dstableSELECT  bb.ipFROM  (SELECTip  ,sum(click) click_num,round(sum(click) / sum(imp), 4)  user_click_rateFROM schema.srctable1WHEREdate    = '20171020'AND ip IS NOT NULLAND imp > 0GROUP BY  ip) bbLEFT OUTER JOIN cta aaLEFT OUTER JOIN schema.dstable ccon cc.ip = bb.ipWHERE cc.ip is nullAND (bb.user_click_rate > aa.avg_click_rate * 3AND click_num > 500)OR(click_num > 1000)

注意:
cache 表不一定会被广播到Executor,执行map side join,还受另外一个参数:spark.sql.autoBroadcastJoinThreshold影响,该参数判断是否将该表广播;
spark.sql.autoBroadcastJoinThreshold参数默认值是10M,所以只有cache的表小于10M的才被广播到Executor上去执行map side join。

  相关解决方案