当前位置: 代码迷 >> 综合 >> Flink 1.11新特性之SQL Hive Streaming简单示例
  详细解决方案

Flink 1.11新特性之SQL Hive Streaming简单示例

热度:54   发布时间:2023-09-11 20:00:33.0

前言

今天本来想搞篇走读StreamingFileSink源码的文章,但是考虑到Flink 1.11版本发布已经有段时间了,于是就放松一下,体验新特性吧。

与1.10版本相比,1.11版本最为显著的一个改进是Hive Integration显著增强,也就是真正意义上实现了基于Hive的流批一体。本文用简单的本地示例来体验Hive Streaming的便利性。

添加相关依赖

测试集群上的Hive版本为1.1.0,Hadoop版本为2.6.0,Kafka版本为1.0.1。

<properties><scala.bin.version>2.11</scala.bin.version><flink.version>1.11.0</flink.version><flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version><hive.version>1.1.0</hive.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.bin.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.bin.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_${scala.bin.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>${flink-shaded-hadoop.version}</version></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version></dependency>

另外,别忘了找到hdfs-site.xml和hive-site.xml,并将其加入项目。

创建执行环境

Flink 1.11的Table/SQL API中,FileSystem Connector是靠一个增强版StreamingFileSink组件实现,在源码中名为StreamingFileWriter。我们知道,只有在checkpoint成功时,StreamingFileSink写入的文件才会由pending状态变成finished状态,从而能够安全地被下游读取。所以,我们一定要打开checkpointing,并设定合理的间隔。

Flink 1.11新特性之SQL Hive Streaming简单示例
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)val tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))

注册HiveCatalog

val catalogName = "my_catalog"
val catalog = new HiveCatalog(catalogName,              // catalog name"default",                // default database"/Users/lmagic/develop",  // Hive config (hive-site.xml) directory"1.1.0"                   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)

创建Kafka流表

Kafka topic中存储的是JSON格式的埋点日志,建表时用计算列生成事件时间与水印。1.11版本SQL Kafka Connector的参数相比1.10版本有一定简化。

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")tableEnv.executeSql("""|CREATE TABLE stream_tmp.analytics_access_log_kafka (|  ts BIGINT,|  userId BIGINT,|  eventType STRING,|  fromType STRING,|  columnType STRING,|  siteId BIGINT,|  grouponId BIGINT,|  partnerId BIGINT,|  merchandiseId BIGINT,|  procTime AS PROCTIME(),|  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),|  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND|) WITH (|  'connector' = 'kafka',|  'topic' = 'ods_analytics_access_log',|  'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092'|  'properties.group.id' = 'flink_hive_integration_exp_1',|  'scan.startup.mode' = 'latest-offset',|  'format' = 'json',|  'json.fail-on-missing-field' = 'false',|  'json.ignore-parse-errors' = 'true'|)""".stripMargin
)

前面已经注册了HiveCatalog,故在Hive中可以观察到创建的Kafka流表的元数据(注意该表并没有事实上的列)。

hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;
OK
# col_name              data_type               comment# Detailed Table Information
Database:               stream_tmp
Owner:                  null
CreateTime:             Wed Jul 15 18:25:09 CST 2020
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Location:               hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafka
Table Type:             MANAGED_TABLE
Table Parameters:flink.connector         kafkaflink.format            jsonflink.json.fail-on-missing-field    falseflink.json.ignore-parse-errors  trueflink.properties.bootstrap.servers  kafka110:9092,kafka111:9092,kafka112:9092flink.properties.group.id   flink_hive_integration_exp_1flink.scan.startup.mode latest-offsetflink.schema.0.data-type    BIGINTflink.schema.0.name     tsflink.schema.1.data-type    BIGINTflink.schema.1.name     userIdflink.schema.10.data-type   TIMESTAMP(3)flink.schema.10.expr    TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss'))flink.schema.10.name    eventTimeflink.schema.2.data-type    VARCHAR(2147483647)flink.schema.2.name     eventType# 略......flink.schema.9.data-type    TIMESTAMP(3) NOT NULLflink.schema.9.expr     PROCTIME()flink.schema.9.name     procTimeflink.schema.watermark.0.rowtime    eventTimeflink.schema.watermark.0.strategy.data-type TIMESTAMP(3)flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '15' SECONDflink.topic             ods_analytics_access_logis_generic              truetransient_lastDdlTime   1594808709# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:serialization.format    1
Time taken: 1.797 seconds, Fetched: 61 row(s)

创建Hive表

Flink SQL提供了兼容HiveQL风格的DDL,指定SqlDialect.HIVE即可(DML兼容还在开发中)。

为了方便观察结果,以下的表采用了天/小时/分钟的三级分区,实际应用中可以不用这样细的粒度(10分钟甚至1小时的分区可能更合适)。

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")tableEnv.executeSql("""|CREATE TABLE hive_tmp.analytics_access_log_hive (|  ts BIGINT,|  user_id BIGINT,|  event_type STRING,|  from_type STRING,|  column_type STRING,|  site_id BIGINT,|  groupon_id BIGINT,|  partner_id BIGINT,|  merchandise_id BIGINT|) PARTITIONED BY (|  ts_date STRING,|  ts_hour STRING,|  ts_minute STRING|) STORED AS PARQUET|TBLPROPERTIES (|  'sink.partition-commit.trigger' = 'partition-time',|  'sink.partition-commit.delay' = '1 min',|  'sink.partition-commit.policy.kind' = 'metastore,success-file',|  'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'|)""".stripMargin
)

Hive表的参数复用了SQL FileSystem Connector的相关参数,与分区提交(partition commit)密切相关。仅就上面出现的4个参数简单解释一下。

  • sink.partition-commit.trigger:触发分区提交的时间特征。默认为processing-time,即处理时间,很显然在有延迟的情况下,可能会造成数据分区错乱。所以这里使用partition-time,即按照分区时间戳(即分区内数据对应的事件时间)来提交。
  • partition.time-extractor.timestamp-pattern:分区时间戳的抽取格式。需要写成yyyy-MM-dd HH:mm:ss的形式,并用Hive表中相应的分区字段做占位符替换。显然,Hive表的分区字段值来自流表中定义好的事件时间,后面会看到。
  • sink.partition-commit.delay:触发分区提交的延迟。在时间特征设为partition-time的情况下,当水印时间戳大于分区创建时间加上此延迟时,分区才会真正提交。此值最好与分区粒度相同,例如若Hive表按1小时分区,此参数可设为1 h,若按10分钟分区,可设为10 min
  • sink.partition-commit.policy.kind:分区提交策略,可以理解为使分区对下游可见的附加操作。metastore表示更新Hive Metastore中的表元数据,success-file则表示在分区内创建_SUCCESS标记文件。

当然,SQL FileSystem Connector的功能并不限于此,还有很大自定义的空间(如可以自定义分区提交策略以合并小文件等)。具体可参见官方文档。

流式写入Hive

注意将流表中的事件时间转化为Hive的分区。

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql("""|INSERT INTO hive_tmp.analytics_access_log_hive|SELECT|  ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId,|  DATE_FORMAT(eventTime,'yyyy-MM-dd'),|  DATE_FORMAT(eventTime,'HH'),|  DATE_FORMAT(eventTime,'mm')|FROM stream_tmp.analytics_access_log_kafka|WHERE merchandiseId > 0""".stripMargin
)

来观察一下流式Sink的结果吧。

Flink 1.11新特性之SQL Hive Streaming简单示例

上文设定的checkpoint interval是20秒,可以看到,上图中的数据文件恰好是以20秒的间隔写入的。由于并行度为3,所以每次写入会生成3个文件。分区内所有数据写入完毕后,会同时生成_SUCCESS文件。如果是正在写入的分区,则会看到.inprogress文件。

通过Hive查询一下,确定数据的时间无误。

hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT)))> FROM hive_tmp.analytics_access_log_hive> WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';
OK
2020-07-15 23:23:00 2020-07-15 23:23:59
Time taken: 1.115 seconds, Fetched: 1 row(s)

流式读取Hive

要将Hive表作为流式Source,需要启用dynamic table options,并通过table hints来指定Hive数据流的参数。以下是简单地通过Hive计算商品PV的例子。

tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)val result = tableEnv.sqlQuery("""|SELECT merchandise_id,count(1) AS pv|FROM hive_tmp.analytics_access_log_hive|/*+ OPTIONS(|  'streaming-source.enable' = 'true',|  'streaming-source.monitor-interval' = '1 min',|  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'|) */|WHERE event_type = 'shtOpenGoodsDetail'|AND ts_date >= '2020-07-15'|GROUP BY merchandise_id|ORDER BY pv DESC LIMIT 10""".stripMargin
)result.toRetractStream[Row].print().setParallelism(1)
streamEnv.execute()

三个table hint参数的含义解释如下。

  • streaming-source.enable:设为true,表示该Hive表可以作为Source。
  • streaming-source.monitor-interval:感知Hive表新增数据的周期,以上设为1分钟。对于分区表而言,则是监控新分区的生成,以增量读取数据。
  • streaming-source.consume-start-offset:开始消费的时间戳,同样需要写成yyyy-MM-dd HH:mm:ss的形式。

更加具体的说明仍然可参见官方文档(吐槽一句,这份文档的Chinglish味道真的太重了=。=

最后,由于SQL语句中有ORDER BY和LIMIT逻辑,所以需要调用toRetractStream()方法转化为回撤流,即可输出结果。

The End

Flink 1.11的Hive Streaming功能大大提高了Hive数仓的实时性,对ETL作业非常有利,同时还能够满足流式持续查询的需求,具有一定的灵活性。

还有事情要做,民那晚安。

  相关解决方案