当前位置: 代码迷 >> SQL >> SQL on Hadoop系统的小结分析(一)
  详细解决方案

SQL on Hadoop系统的小结分析(一)

热度:77   发布时间:2016-05-05 11:50:18.0
SQL on Hadoop系统的总结分析(一)
为什么非要把SQL放到Hadoop上? SQL易于使用。
那为什么非得基于Hadoop呢?the robust and scalable architecture of Hadoop

目前SQL on Hadoop产品主要有以下几种:
Hive, Tez/Stinger, Impala, Shark/Spark, Phoenix, Hawq/Greenplum, HadoopDB, Citusdata等。本文主要讨论Hive, Tez/Stinger, Impala, Shark以及传统开源数据仓库brighthouse的特点和最新进展;下一篇文章会讨论Phoenix, Hadapt/HadoopDB, Hawq/Greenplum。

在互联网企业中一般的基于Hadoop的数据仓库的数据来源主要有以下几个:
1,通过Flume/Scribe/Chukwa这样的日志收集和分析系统把来自Apache/nginx等Server cluster的日志收集到HDFS上,然后通过Hive创建Table时指定SerDe把非结构化的日志数据转化成结构化数据。
2,通过Sqoop这样的工具把用户和业务维度数据(一般存储在Oracle/MySQL中)定期导入Hive,那么OLTP数据就有了一个用于OLAP的副本了。
3,通过ETL工具从其他外部DW数据源里导入的数据。

目前所有的SQL on Hadoop产品其实都是在某个或者某些特定领域内适合的,没有silver bullet。像当年Oracle/Teradata这样的满足几乎所有企业级应用的产品在现阶段是不现实的。所以每一种SQL on Hadoop产品都在尽量满足某一类应用的特征。
典型需求:
1,  interactive query (ms~3min)
2,data analyst, reporting query (3min~20min)
3,data mining, modeling and large ETL (20 min ~ hr ~ day)
4,机器学习需求(通过MapReduce/MPI/Spark等计算模型来满足)

Hive
Hive是目前互联网企业中处理大数据、构建数据仓库最常用的解决方案,甚至在很多公司部署了Hadoop集群不是为了跑原生MapReduce程序,而全用来跑Hive SQL的查询任务。

对于有很多data scientist和analyst的公司,会有很多相同table的查询需求。那么显然每个人都从hive中查数据速度既慢又浪费资源。我们在online的数据库系统部署的时候都会在DB前面部署Redis或者memcache用于缓存用户经常访问的数据。那么OLAP应用也可以参考类似的方法,把经常访问的数据放到内存组成的集群中供用户查询。
Facebook针对这一需求开发了Presto,一个把热数据放到内存中供SQL查询的系统。这个设计思路跟Impala和Stinger非常类似了。使用Presto进行简单查询只需要几百毫秒,即使是非常复杂的查询,也只需数分钟即可完成,它在内存中运行,并且不会向磁盘写入。Facebook有超过850名工程师每天用它来扫描超过320TB的数据,满足了80%的ad-hoc查询需求。

目前Hive的主要缺点:
1,data shuffle时网络瓶颈,Reduce要等Map结束才能开始,不能高效利用网络带宽
2,一般一个SQL都会解析成多个MR job,Hadoop每次Job输出都直接写HDFS,性能差
3,每次执行Job都要启动Task,花费很多时间,无法做到实时
4,由于把SQL转化成MapReduce job时,map,shuffle和reduce所负责执行的SQL功能不同。那么就有Map->MapReduce或者MapReduce->Reduce这样的需求。这样可以降低写HDFS的次数,从而提高性能。

目前Hive主要的改进(主要是体现在 hive 0.11版本上):

1,同一条hive sql解析出的多个MR任务的合并。
由Hive解析出来的MR jobs中有非常多的Map->MapReduce类型的job,可以考虑把这个过程合并成一个MRjob。https://issues.apache.org/jira/browse/HIVE-3952
2,Hive query optimizer(查询优化器是Hive需要持续不断优化的一个topic)

http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/optimize-joins.html

Joins where one side fits in memory
Star schema join的改进,就是原来一个大表和多个小表在不同column匹配的条件下join需要解析成多个map join + MR job,现在可以合并成一个MR job
这个改进方向要做的就是用户不用给太多的hint,hive可以自己根据表的大小、行数等,自动选择最快的join的方法(小表能装进内存的话就用map join,Map join能和其他MR job合并的就合并)。这个思路跟cost-based query optimizer有点类似了,用户写出来的SQL在翻译成执行计划之前要计算那种执行方式效率更高。

3,ORCFile
ORCFile是一种列式存储的文件,对于分析型应用来说列存有非常大的优势。 http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/orcfile.html
原来的RCFile中把每一列看成binary blob,没有任何语义,所以只能用通用的zlib,LZO,Snappy等压缩方法。
ORCFile能够获取每一列的类型(int还是string),那么就可以使用诸如dictionary encoding, bit packing, delta encoding, run-length encoding等轻量级的压缩技术。这种压缩技术的优势有两点:一是提高压缩率;二是能够起到过滤无关数据的效果
现在ORCFile中主要有三种编码:

bit编码,所有数据类型都可以用。Google’s protocol buffers and uses the high bit to represent whether this byte is not the last and the lower 7 bits to encode data
run-length encoding(行程长度压缩算法),int类型专用。
dictionary encoding,string类型专用。同时这个dictionary还能帮助过滤查询中的predicate条件。
Run length Encoding对某些列压缩会减少存储3-4个数量级,对内存提升也有2-3个数量级,Dictionary Encoding一般对磁盘空间减少大概20倍,对内存空间大概减少5倍,根据Google PowerDrill的实验,在常见的聚合查询中这些特殊的编码方式会对查询速度有2-3个数量级的提升.

Predicate Pushdown:原来的Hive是把所有的数据都读到内存中,然后再判断哪些是符合查询需求的。在ORCFile中数据以Stripe为单元读取到内存,那么ORCFile的RecordReader会根据Stripe的元数据(Index Data,常驻内存)判断该Stripe是否满足这个查询的需求,如果不满足直接略过不读,从而节省了IO。

关于ORCFile的压缩效果,使用情况和性能可以参考hortonworks的博客

http://hortonworks.com/blog/orcfile-in-hdp-2-better-compression-better-performance/
未来ORCFile还会支持轻量级索引,就是每一列中以1W行作为一组的最大值和最小值。

通过对ORCFile的上述分析,我想大家已经看到了brighthouse的影子了吧。都是把列数据相应的索引、统计数据、词典等放到内存中参与查询条件的过滤,如果不符合直接略过不读,大量节省IO。关于brighthouse大家可以参考下面的分析。

4,HiveServer2的Security和Concurrency特性

http://blog.cloudera.com/blog/2013/07/how-hiveserver2-brings-security-and-concurrency-to-apache-hive/

HiveServer2能够支持并发客户端(JDBC/ODBC)的访问。
Cloudera还搞了个Sentry用于Hadoop生态系统的的安全性和授权管理方面的工作。
这两个特点是企业级应用Hadoop/Hive主要关心的。

5,HCatalog Hadoop的统一元数据管理平台
目前Hive存储的表格元数据和HDFS存储的表格数据之间在schema上没有一致性保证,也就是得靠管理员来保证。目前Hive对列的改变只会修改 Hive 的元数据,而不会改变实际数据。比如你要添加一个column,那么你用hive命令行只是修改了了Hive元数据,没有修改HDFS上存储的格式。还得通过修改导入HDFS的程序来改变HDFS上存储的文件的格式。而且还要重启Hive解析服务,累坏了系统管理员。

Hadoop系统目前对表的处理是’schema on read’,有了HCatlog就可以做到EDW的’schema on write’。
HCatlog提供REST接口提供元数据服务,有利于不同平台(HDFS/HBase/Oracle/MySQL)上的不同数据(unstructured/semi-structured/structured)共享。能够把Hadoop和EDW结合起来使用。
HCatlog对用户解耦了schema和storage format。举个例子吧,在写MR任务的时候,目前是把所有的行数据都当成Text来处理,Text一点点解析出各个Column需要编程人员来控制。有个HCatlog之后编程人员就不用管这事了,直接告诉它是哪个Database->Table,然后schema可以通过查询HCatlog来获得。也省得数据存储格式发生变化之后,原来的程序不能用的情况发生。
6,Windowing and Analytics Functions的支持。

支持Windowing functions : lead, lag, first_value, last_value
支持over clause : partition by, order by, window frame
支持analytics functions : rank, row_number, dense_rank, cume_dist, percent_rank, ntile
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics

https://issues.apache.org/jira/browse/HIVE-4197

Tez/Stinger(升级版的Hive)
Tez是一种新的基于YARN的DAG计算模型,主要是为了优化Hive而设计的。目前Tez/Stinger主要是Hortonworks在搞,他们希望以后把Hive SQL解析成能够在Tez上跑的DAG而不是MapReduce,从而解决计算实时性的问题。Tez的主要特点有:

底层执行引擎不再使用MR,而是使用基于YARN的更加通用的DAG执行引擎
MR是高度抽象的Map和Reduce两个操作,而Tez则是在这两个操作的基础上提供了更丰富的接口。把Map具体到Input, Processor, Sort, Merge, Output,而Reduce也具体化成Input, Shuffle, Sort, Merge, Processor, Output。在MR程序里,编程人员只需编写对应的Processor逻辑,其他的是通过指定几种具体实现来完成的;而在Tez里面给我们更大的自由度。其实这个跟Spark有点类似了,都是提供更丰富的可操作单元给用户。
传统的Reduce只能输出到HDFS,而Tez的Reduce Processor能够输出给下一个Reduce Processor作为输入。
Hot table也放到内存中cache起来
Tez service:预启动container和container重用,降低了每次Query执行计划生成之后Task启动的时间,从而提高实时性。
Tez本身只是YARN框架下得一个library,无需部署。只需指定mapreduce.framework.name=yarn-tez
http://dongxicheng.org/mapreduce-nextgen/apache-tez-newest-progress/

Tez/Stinger还有一个最重要的feature : Vectorized Query Execution (该feature在HDP 2.0 GA中会提供)

https://issues.apache.org/jira/browse/HIVE-4160
目前Hive中一行一行的处理数据,然后调用lazy deserialization解析出该列的Java对象,显然会严重影响效率。
多行数据同时读取并处理(基本的比较或者数值计算),降低了一行一行处理中过多的函数调用的次数,提高了CPU利用率和cache命中率
需要实现基于向量的vectorized scan, filter, scalar aggregate, group-by-aggregate, hash join等基本操作单元。
未来工作方向:
Cost-based optimizer,基于统计选择执行策略,例如多表JOIN时按照怎样的顺序执行效率最高。
统计执行过程中每个中间表的Row/Column等数目,从而决定启动多少个MR执行

Impala
Impala可以看成是Google Dremel架构和MPP (Massively Parallel Processing)结构的混合体。

https://github.com/cloudera/impala

Dremel论文: http://research.google.com/pubs/pub36632.html

优点:

目前支持两种类型的JOIN:broadcast join和partition join。对于大表JOIN时由于内存限制,装不下时就要dump部分数据到磁盘,那样就会比较慢
Impala各个任务之间传输数据采用的是push的方式(MR采用的是pull的方式),也就是上游任务计算完就会push到下游,这样能够分散网络压力,提高job执行效率。
Parquet列存格式,同时能够处理嵌套数据。通过嵌套数据以及扩展的SQL查询语义,在某些特定的场景上避开了JOIN从而解决了一部分性能的bottleneck。(Impala目前支持LZO Text, Sequence, Avro, RCFile, Parquet, 不支持ORCFile。但是未来支持多种存储文件格式是个趋势,所以Impala读取文件这部分功能也需要尽快插件化。)
Cloudera Manager 4.6以后会有slow query的分析功能
Runtime Code Generation http://blog.cloudera.com/blog/2013/02/inside-cloudera-impala-runtime-code-generation/
impala可以直接使用硬盘上的数据而不经过hdfs
缺点:

impala不会按照group by的列排序
目前不支持UDF,impala 1.2即将支持Hive UDFs(Java写的)和Impala native UDFs and UDAs(接口类似PosgreSQL)
不支持像Hive的Serializer/Deserializer,从而使得它做从非结构化到结构化数据的ETL工作比较麻烦。所以本质上讲Impala适合MR配合做ETL之后的查询工作。
由于Impala的设计初衷是short query,所以不支持fault tolerance。如果参与查询的某个node出错,Impala将会丢弃本次查询。
安全方面的支持还比较差。impalad之间传输的数据没有加密,不支持表或者列级别的授权。
每个PlanFragment执行尽量并行化,但是有的时候并不是很容易。例如Hash Join需要等到其中一个表完全Scan结束才能开始。
虽然有这么多缺点,但是很多公司还是开始尝试Impala了。以百度为例,百度尝试把MySQL接入Impala的后端作为存储引擎,同时实现相应操作对应的PlanFragment,那么用户来的query还是按照原来的解析方法解析成各种PlanFragment,然后直接调度到对应的节点(HDFS DataNode/HBase RegionServer/MySQL)上执行。会把某些源数据或者中间数据放到MySQL中,用户的query涉及到使用这部分数据时直接去MySQL里面拿。
Shark/Spark

由于数据能放到内存尽量放到内存,使用内存非常aggressive。优点是做JOIN时会比较快,缺点是占用内存太大,且自行管理内存,占用内存后不会释放。
由于Shark借用了Hive的codebase,所以在SQL,SerDes,UDF支持方面和Hive是完全兼容的。
支持fault tolerance
性能:
特别简单的select…where查询,shark性能的提升不明显。(因为hive也不怎么费时间)
但是如果查询比较复杂select…join…where…group by,hive的job数目会比较多,读写HDFS次数增多,时间自然会变长。当内存还足够大的时候shark性能是最好的,如果内存不够装下所有的数据时性能会下降,但还是会比Hive好很多。

SQL on Hadoop产品需要向传统数据仓库学习的地方
以开源数据仓库brighthouse(基于MySQL的数据仓库存储引擎)为例。
VLDB 2008 论文 <<Brighthouse: An Analytic Data Warehouse for Ad-hoc Queries>>

brighthouse的SQL解析用的是MySQL的代码,开发了brighthouse专用的optimizer,executor以及storage engine
brighthouse的数据存储通过三层来组织:Data Pack, Data Pack Node, Knowledge Node

DP(Data Pack):brighthouse是列存储的,每个DP存储一列中64K个单元的数据。
DPN(Data Pack Node):DPN和DP是一对一的关系,DPN中记录每个DP数据对应的一些统计值(max,min,count,sum)
KN(Knowledge Node):DP的更详细的数据信息和DP之间关系的信息
KN又分为一下三个部分:

HISTs(Histograms):数值类型列的统计直方图,能够快速判断这个DP是否符合查询条件。
CMAPs(Character Maps):文本类型的位图,用于快速查找字符。(优化关键字like)
Pack-To-Pack:等值JOIN操作时产生的两个列(DP)之间关系的位图。
DPN和KN相当于DP的一些统计信息,占整个DP的1%的存储空间,所以可以轻松装入内存。他们是为了快速定位哪些DP是跟这个query相关(relevant)的,哪些是不相关(irrelevant)的,哪些是可能相关(suspect)的。从而减小IO读取的数据量,提高性能。

性能测试:http://www.fuchaoqun.com/tag/brighthouse/
从这个性能测试中可以看出:
1,压缩率:infobright比MyISAM/tar.gz的压缩率都要高很多
2,查询性能:跟建了索引的MyISAM表相比,查询速度也要快3-6倍

总之,大家都缺少的是:
1,workload management or query optimization
多个表的JOIN如何执行,例如3个表的JOIN会有6种执行策略,那么哪一种才是效率最高的呢。显然要通过计算每种执行顺序的开销来获得。在传统数据库或者数据仓库领域(Oracle/Teradata/PostgreSQL)都有非常好的查询优化器,而在分布式系统中该如何衡量这些指标(磁盘IO,网络带宽,内存)与最后查询效率之间的关系是个需要认真研究的问题。
2,关联子查询correlated sub-queries还是没有谁能够实现。
在TPC-H中又很多关联子查询的例子,但是现在的SQL on Hadoop产品都不支持。听Impala的人说,他们客户对这个的需求不是很强烈,大部分关联子查询可以转化成JOIN操作。但是目前的商业产品像Hawq/Greenplum都是支持关联子查询的。