之前直接搬了1.14的连接器嫁接了一下,呆胶布
HBase 连接器支持读取和写入 HBase 集群。本文档介绍如何使用 HBase 连接器基于 HBase 进行 SQL 查询。
HBase 连接器在 upsert 模式下运行,可以使用 DDL 中定义的主键与外部系统交换更新操作消息。但是主键只能基于 HBase 的 rowkey 字段定义。如果没有声明主键,HBase 连接器默认取 rowkey 作为主键。
1.1 依赖
为了使用HBase连接器,使用构建自动化工具(如Maven或SBT)的项目和使用SQL JAR捆绑包的SQL客户端都需要以下依赖项。
HBase version |
Maven dependency |
1.4.x |
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-1.4_2.11</artifactId> <version>1.12.4</version> </dependency> |
2.2.x |
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-2.2_2.11</artifactId> <version>1.12.4</version> </dependency> |
1.2 如何使用 HBase 表
所有 HBase 表的列簇必须定义为 ROW 类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。
- SQL
-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATE TABLE hTable (
rowkey INT,
family1 ROW<q1 INT>,
family2 ROW<q2 STRING, q3 BIGINT>,
family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
PRIMARY KEY (rowkey) NOT ENFORCED) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'mytable',
'zookeeper.quorum' = 'localhost:2181');
-- 用 ROW(...) 构造函数构造列簇,并往 HBase 表写数据。-- 假设 "T" 的表结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTableSELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
-- 从 HBase 表扫描数据
SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
-- temporal join HBase 表,将 HBase 表作为维表
SELECT * FROM myTopicLEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctimeON myTopic.key = hTable.rowkey;
1.12 hbase sql connector不支持async查询hbase 1.13支持较好
1.3 连接器参数
参数 |
是否必选 |
默认参数 |
数据类型 |
描述 |
connector |
必选 |
(none) |
String |
指定使用的连接器, 支持的值如下 :
|
table-name |
必选 |
(none) |
String |
连接的 HBase 表名。 |
zookeeper.quorum |
必选 |
(none) |
String |
HBase Zookeeper quorum 信息。 |
???????zookeeper.znode.parent |
可选 |
/hbase |
String |
HBase 集群的 Zookeeper 根目录。 |
???????null-string-literal |
可选 |
null |
String |
当字符串值为 null 时的存储形式,默认存成 "null" 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。 |
???????sink.buffer-flush.max-size |
可选 |
2mb |
MemorySize |
写入的参数选项。每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
???????sink.buffer-flush.max-rows |
可选 |
1000 |
Integer |
写入的参数选项。 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
???????sink.buffer-flush.interval |
可选 |
1s |
Duration |
写入的参数选项。刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",刷写选项整个异步处理缓存行为。 |
???????sink.parallelism |
可选 |
(none) |
Integer |
为 HBase sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游 operator 一样。 |
1.4 数据类型映射表
HBase 以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。
Flink 的 HBase 连接器利用 HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes 进行字节数组和 Flink 数据类型转换。
Flink 的 HBase 连接器将所有数据类型(除字符串外)null 值编码成空字节。对于字符串类型,null 值的字面值由null-string-literal选项值决定。
数据类型映射表如下:
Flink 数据类型 |
HBase 转换 |
CHAR / VARCHAR / STRING |
byte[] toBytes(String s)String toString(byte[] b) |
BOOLEAN |
byte[] toBytes(boolean b)boolean toBoolean(byte[] b) |
BINARY / VARBINARY |
返回 byte[]。 |
DECIMAL |
byte[] toBytes(BigDecimal v)BigDecimal toBigDecimal(byte[] b) |
TINYINT |
new byte[] { val }bytes[0] // returns first and only byte from bytes |
SMALLINT |
byte[] toBytes(short val)short toShort(byte[] bytes) |
INT |
byte[] toBytes(int val)int toInt(byte[] bytes) |
BIGINT |
byte[] toBytes(long val)long toLong(byte[] bytes) |
FLOAT |
byte[] toBytes(float val)float toFloat(byte[] bytes) |
DOUBLE |
byte[] toBytes(double val)double toDouble(byte[] bytes) |
DATE |
从 1970-01-01 00:00:00 UTC 开始的天数,int 值。 |
TIME |
从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。 |
TIMESTAMP |
从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。 |
ARRAY |
不支持 |
MAP / MULTISET |
不支持 |
ROW |
不支持 |
HBase Async Code连接器
2.1 LRU
LRU是一种缓存算法,意思是最近最少使用的数据则被淘汰。在这种策略中,维表数据天然被分为冷数据和热数据,所谓冷数据指的是那些不经常使用的数据,热数据是那些查询频率高的数据。
LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据。
在Flink中做维表关联时,如果维表的数据比较大,无法一次性全部加载到内存中,而在业务上也允许一定数据的延时,那么就可以使用LRU策略加载维表数据。但是如果一条维表数据一直都被缓存命中,这条数据永远都不会被淘汰,这时维表的数据已经发生改变,那么将会在很长时间或者永远都无法更新这条改变,所以需要设置缓存超时时间TTL,当缓存时间超过ttl,会强制性使其失效重新从外部加载进来。接下来介绍两种比较常见的LRU使用:
- LinkedHashMap
- LinkedHashMap是双向链表+hash表的结构,普通的hash表访问是没有顺序的,通过加上元素之间的指向关系保证元素之间的顺序,默认是按照插入顺序的,插入是链表尾部,取数据是链表头部,也就是访问的顺序与插入的顺序是一致的。要想其具有LRU特性,那么就将其改为访问顺序,插入还是在链表尾部,但是数据访问会将其移动达到链表的尾部,那么最近插入或者访问的数据永远都在链表尾部,被访问较少的数据就在链表的头部,给 LinkedHashMap设置一个大小,当数据大小超过该值,就直接从链表头部移除数据。
- LinkedHashMap本身不具有ttl功能,就是无法知晓数据是否过期,可以通过给数据封装一个时间字段insertTimestamp,表示数据加载到内存的时间,当这条记录被命中,首先判断当前时间currentTimestamp与insertTimestamp差值是否达到ttl, 如果达到了就重新从外部存储中查询加载到内存中。
- guava Cache
- google guava下面提供了Cache缓存模块,轻量级,适合做本地缓存,能够做到以下几点:
- a. 可配置本地缓存大小
- b. 可配置缓存过期时间
- c. 可配置淘汰策略
- 非常适用于Flink维表关联LRU策略,使用方式:
cache = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(600, TimeUnit.MILLISECONDS)
.build();
表示最大缓存容量为10000,数据的过期时间为600s。
2.2 异步IO
这里需要注意的是用到了异步IO(RichAsyncFunction),这个功能的出现就是为了解决与外部系统交互时网络延迟成为系统瓶颈的问题。
在流计算环境中,在查询外部维表时,加入访问是同步进行的,那么整体能力势必受限于外部系统。正是因为异步IO的出现使得访问外部系统可以并发的进行,并且不需要同步等待返回,大大减轻了因为网络等待时间等引起的系统吞吐和延迟问题。
在使用异步IO时,一定要使用异步客户端,如果没有异步客户端可以自己创建线程池模拟异步请求。
其他
除了上述常见的处理方式,还可以通过将维表消息广播出去,或者自定义异步线程池访问维表,甚至可以自己扩展Flink SQL中关联维表的方式直接使用SQL Join的方法关联查询结果。
2.3 LRU读取Hbase
实现思路:
- 使用Flink 异步IO RichAsyncFunction去异步读取hbase的数据,那么需要hbase 客户端支持异步读取,默认hbase客户端是同步,可使用hbase 提供的asynchbase 客户端;
- 初始化一个Cache 并且设置最大缓存容量与数据过期时间;
- 数据读取逻辑:先根据Key从Cache中查询value,如果能够查询到则返回,如果没有查询到结果则使用asynchbase查询数据,并且将查询的结果插入Cache中,然后返回。
Flink异步Async I/O - 愿无违 - 博客园