当前位置: 代码迷 >> SQL >> SQL on Hadoop系统的小结分析(二)
  详细解决方案

SQL on Hadoop系统的小结分析(二)

热度:18   发布时间:2016-05-05 11:50:19.0
SQL on Hadoop系统的总结分析(二)
上篇主要讨论了Hive, Stinger/Tez, Impala, Shark这些SQL on Hadoop产品,这篇接着讨论Phoenix, Hadapt, Hawq。

Phoenix

Salesforce开源的基于HBase的SQL查询系统,建立在HBase client API, coprocessors, custom filter的基础之上。
基本原理是将一个对于HBase client来说比较复杂的查询转换成一系列Region Scan,结合create table时hook的coprocessor和custom  filter在多台Region Server上进行并行查询, 汇总各个Scan结果输出给调用程序的ResultSet。说白了就是看大家用HBase client API开发程序太麻烦了,就弄了个JDBC包装,这样对于software engineer来说降低了开发成本,同时对于简单单表查询性能损失不大。
种种迹象表明,Phoenix应该不是个优化的OLAP系统,更像是一个用于简单单表查询,过滤,排序,检索的OLTP系统。
优点:

HBase默认存储的数据类型都是字符串,但Phoenix支持更多的数据类型(int, float, char, varchar, time, date)
使用JDBC操作数据,而不是HBase client API
在RegionServer端通过coprocessor过滤where条件,执行aggregation函数。Hive on HBase把SQL转化成MapReduce去查询HBase;Impala on HBase把SQL转化成PlanFragment执行计划去查询HBase; Phoenix把SQL转化成对HBase client API和coprocessor的调用,这三者的架构是相似的。不同点就是Hive on HBase和Impala on HBase都没有把coprocessor利用好,都是通过HBase client API把数据读到他们自己进程的内存之后才进行的filter, aggregation等操作。所以理论上讲前两种架构设计的产品性能不可能超过直接调用HBase Client的方式。
从查询的角度来看HBase的column主要分为两类:primary key(row key column)和other columns。主要的不同是row key column能够利用HBase Region Server的index, filter, sort等特性,而other columns没有这些特性,只能通过二级索引辅助做一些优化。Phoenix能够在HBase上创建二级索引用于优化non row key columns的条件查询(目前只支持在static table上建二级索引,一个更通用的HBase二级索引实现方法可以参考华为开源的这个实现https://github.com/Huawei-Hadoop/hindex)。
salting of row keys to evenly distribute write load
如果是row key column上的IN/OR/LIKE条件,可以通过Region Server的skip scan filter优化。
Dynamic columns支持(跟RDBMS的dynamic schema change类似),也就是用户不需要在create table的时候指定所有的column,后面什么时候需要随时添加。这个功能主要依赖于HBase的动态添加column的功能。
AutoCommit=false时(默认是false)把所有操作先缓存在客户端,只有你显示commit时才一次批量提交到HBase,SQL解析优化全是在客户端做,这个有点事务的意思。
缺点:

不支持JOIN,考虑到HBase的设计初衷是尽量用冗余数据减少复杂的JOIN操作,实际上可以把相关数据都放在同一个表里,而不需要为了减少数据冗余,拆分到多个表中,所以很大程度也可以认为这不是一个缺点。
从架构上看也仅是把SQL转成HBase Client的API和coprocessor的调用,而且coprocessor还不适合大规模数据的传输,所以如果中间结果的数据量还是比较大的话性能问题还是很明显的。
这个缺点是所有的基于HBase的SQL系统都有的(包括Hive on HBase和Impala on HBase)。不管什么请求到HBase Region Server这边都得通过RegionScanner,这个接口不是面向OLAP型应用优化的存储文件读取接口。例如RegionScanner的实现里好多条件比较,是不利于全表扫描的。所以全表扫描的应用不如一个一个地读HFile,当然前提是得离线把memstore的数据都dump到hfile。目前coprocessor也是走的RegionScanner。这部分要想改得改Region Server代码了,那就是Apache HBase社区的事了。
还有个问题就是coprocessor的问题了,由于coprocessor和HBase Region Server是在一个JVM里面,所以当coprocessor计算逻辑非常复杂,中间结果数据量很大的时候会占用大量内存。同时coprocessor不是流式地读取数据,某些节点数据积累过多也会造成内存不够用的问题。
RoadMap:
JOIN支持,虽然有点不符合设计初衷,但是大家都支持,就我不支持,太out of fashion了吧。
Transaction支持,通过参考https://github.com/yahoo/omid的方法。
Online Schema Evolution,动态改变column的类型,rename等。https://github.com/forcedotcom/phoenix/wiki
Hadapt/HadoopDB

http://hadapt.com/product/

http://db.cs.yale.edu/hadoopdb/hadoopdb.pdf

架构和Hive相似,底层存储引擎有两种:HDFS和RDBMS(PostgreSQL),一个DataNode节点上有一个RDBMS节点。
提供两种接口:SQL和MapReduce,SQL也是解析成MapReduce job来执行的,所以总的来说执行引擎都是MR。
把多个MapReduce任务,转换成单node上的SQL+一个MR(data shuffle),这个跟水平压缩,垂直压缩类似,尽量减少SQL解析出的MR task个数,减少任务之间写HDFS的IO数据量。把一个SQL拆解成两部分:适合SQL做的用单机SQL,不适合的用MR(data shuffle)
和Hive的不同点在于Hive只能操控HDFS上的数据,而Hadapt可以操控HDFS和RDBMS两种数据来源。对于RDBMS这个数据源来说,数据被预先load到分布式的RDBMS节点中,有统一的Catalog管理所有RDBMS中的数据。例如Map中的有些执行逻辑直接通过一个在RDBMS上执行的SQL来获得(修改InputFormat),然后使用MapReduce来做JOIN/Group By。而且如果在数据被load到分布式PostgreSQL节点上时分布情况正好符合group by/order by的条件,那么还省得通过MapReduce的shuffle来做了。
Hadapt的本质还是把SQL解析成MR任务来做,不是MPP的思想,所以Hive具备的有些缺点(启动时间长,JOIN效率较低)它也是具有的。还有如果想要join/group by/order by能够在RDBMS数据源之间高效执行,还得考虑数据预分布的问题。
需要统一的元数据和数据一致性服务用于管理HDFS上的数据导入分布式PostgreSQL以及分区。
在执行多个Query的时候,后面的query能够利用前面query的查询结果(已经把前面Query的查询结果可以写到RDBMS中,有点类似于数据仓库中的物化视图的概念),从而可以提高查询的性能。
Combine structured and unstructured data in single query。现在很多公司为了统一计算平台,把放到RDBMS中的数据也放到HDFS上存一份,要不没法和HDFS中的非结构化数据做JOIN。在Hadapt这里用户通过一个Query可以操控两种数据,不用分两个步骤走了。
现在企业级应用大多使用的方案(ebay使用的是Hadoop+Teradata)是Hadoop+MPP/open source+commercial software的方式,即通过Hadoop批处理unstructured data(进行ETL操作)然后通过connector导入MPP进行structure data的query操作。但是这只是临时的替代方案,Hadapt说invisible loading(http://hadapt.com/blog/2012/09/05/invisible-loading-a-new-paradigm-for-loading-from-unstructured-to-structured-storage/ )才是最合理的,这样企业就有了一个unified analytic platform。但是用户把数据load到RDBMS之后就失去了在HDFS上存储的robust和scalable的特征,需要这个系统提供维护数据一致性相关的功能。
Hawq

a relational database that runs atop of HDFS
原来Greenplum Database中的存储是本地磁盘,现在改成HDFS,原来Greenplum Database的单节点的RDBMS只充当execution engine的功能,不再充当storage功能。
Query执行通过Greenplum Database的parallel execution engine(不再使用MR),每次查询开始把数据从HDFS中导入到Greenplum database,执行过程中通过内存交换数据而非MapReduce那样每次任务结束都写磁盘。
Hawq提供一个Universal Catalog Service管理分布在各个RDBMS节点的数据。
GP特有的cost-based parallel query optimizer and planner是它的一大优势,也是目前其他大多数的产品中没有的。它能够帮用户选出该SQL最高效的执行顺序。
使用Greenplum Database充当执行引擎的好处:标准SQL兼容(correlated sub query, window functions, rollups, cube, scalar and aggregate function);支持ACID事务;JDBC/ODBC支持;JOIN顺序优化和索引支持(查询优化器);支持row/column两种存储格式。
GPXF (Greenplum Extension Framework) 使得Hawq能够读取存储在HDFS上的任何格式的数据(delimited text, sequence files, protobuf and avro)以及存储在Cassandra, Mongodb, Isilon, Atom, MapR, Lustre, GPFS中的数据,无非就是多开发个读取接口。EMC是存储出身,肯定是希望这个analytic stack能够接入更多的存储产品,特别是他们卖的东西。
底层的HDFS需要支持trancate语义(https://issues.apache.org/jira/browse/HDFS-3107)和native C interface(不是JNI的,JNI的不适合大规模并行查询,所以应该hawq自己实现了一个基于C的RPC通信接口,与NameNode和DataNode直接通信)。所以说Hawq底层的HDFS跟Apache版本的到底有多大区别,我也不知道。
支持In-Database analytics ( http://madlib.net/ )
可以在Hawq内执行除了Query以外的分析任务,例如analytic functions(standard deviation, variance等)和off-the-shelf analytic package
支持数据挖掘算法:principal components analysis (PCA), enhanced support vector machines (SVM), linear models
性能相关:

Scott Yara(Greenplum老大) 公开承认hawq比pure Greenplum database要慢。这么做的目的无非就是更好的利用HDFS的可扩展性,统一存储管理。
和其他sql on hadoop产品的性能对比方面,hawq在group by和join操作上与其他方案相比优势明显,前提是数据量不是特别大。(是不是因为数据导入的时候partition做的好呢,是不是拿load的时间换group by/join的时间呢?)
http://www.dataintoresults.com/2013/09/big-data-benchmark-impala-vs-hawq-vs-hive/

不过hawq和hadapt都说明了一个问题:就是unified analytic platform的重要性。

从商业产品来看,大数据分析产品主要有:

Teradata/Aster Data
EMC/Greenplum/Hawq
HP/Vertica 列存数据仓库
SAP/HANA 内存分析
Google/BigQuery 典型的Analysis as a Service
Amazon/Redshif 和AWS结合比较紧密
而传统软件厂商IBM, Oracle, Microsoft也都有产品,不过从技术的角度对后面这些公司的产品了解不多。
说完数据仓库相关产品,我们也顺便看看机器学习相关产品。机器学习不像SQL那么普遍,但是非常重要。我所知道的目前互联网公司做机器学习的系统是这样的:
(1) twitter基于pig做Machine Learning

http://www.umiacs.umd.edu/~jimmylin/publications/Lin_Kolcz_SIGMOD2012.pdf

在Hadoop/MapReduce基础上,通过Pig扩展,使该平台具有机器学习处理能力
特征抽取通过UDF实现
单个学习单元的内部循环封装在Pig Storage Function中
预测是根据学习训练的模型,结合UDF实现
(2) 不过目前互联网公司大多使用Hadoop做feature selection,然后对于不同的问题采用两种思路:

采样数据,然后跑单机模型。因为很多机器学习算法是非常不容易并行化的,所以在全量数据的子集上面跑单机模型。
基于MPI开发大规模并行的机器学习算法。
(3) Spark是个非常适合迭代型机器学习算法的计算模型和框架,而且Ecosystem非常完备(Shark,BlinkDB,MLbase)。特别是基于Spark的机器学习算法库MLbase(http://www.cs.berkeley.edu/~ameet/mlbase.pdf)更是给机器学习算法大规模应用提供了帮助。

由于Mahout是MR上的machine learning库,但是底层的MR天然不适合密集迭代计算的机器学习算法,导致Mahout的应用并不是很广泛。但是Spark却是非常适合迭代机器学习算法,那么MLbase的重要性就非常明显了。

目前Berkeley的教授们已经搞了一个公司叫databricks来做Spark/Shark的商业化,我是非常看好Spark的前途的。