在每个 TableEnvironment 中,TableConfig 提供用于当前会话的配置项。
对于常见或者重要的配置项,TableConfig 提供带有详细注释的 getters 和 setters 方法。
对于更加高级的配置,用户可以直接访问底层的 key-value 配置项。以下章节列举了所有可用于调整 Flink Table 和 SQL API 程序的配置项。
注意 因为配置项会在执行操作的不同时间点被读取,所以推荐在实例化 TableEnvironment 后尽早地设置配置项。
// 实例化表环境
TableEnvironment tEnv = ...
// 接入flink配置
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置键值选项
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
注意 目前,key-value 配置项仅被 Blink planner 支持。
执行配置
以下选项可用于优化查询执行的性能。
Key |
Default |
Type |
Description |
table.exec.async-lookup.buffer-capacity
|
100 |
Integer |
The max number of async i/o operation that the async lookup join can trigger. 异步查找连接可以触发的最大异步操作的操作数 |
table.exec.async-lookup.timeout
|
3 min |
Duration |
异步操作的异步超时。 |
table.exec.mini-batch.allow-latency
|
0 ms |
Duration |
最大的延迟可以用于小批量缓冲输入记录。MiniBatch是一种缓冲输入记录以减少状态访问的优化。在允许的延迟间隔和达到最大缓冲记录数时触发迷你批处理。注意:如果table.exec.mini-batch.enabled设置为true,则其值必须大于零。 |
table.exec.mini-batch.enabled
|
false |
Boolean |
指定是否启用小型批量优化。MiniBatch是一种缓冲输入记录以减少状态访问的优化。默认情况下被禁用。要启用此功能,用户应该将此配置设置为true。注意:如果启用了小批量批处理,则必须设置“table.exec.mini-batch.allow-latency”和“table.exec.mini-batch.size”。 |
table.exec.mini-batch.size
|
-1 |
Long |
小批量的输入记录的最大数量可以被缓冲。MiniBatch是一种缓冲输入记录以减少状态访问的优化。在允许的延迟间隔和达到最大缓冲记录数时触发迷你批处理。注意:小批量处理目前仅适用于无窗口的聚合。如果table.exec.mini-batch.enabled设置为true,则其值必须为正。 |
table.exec.resource.default-parallelism
|
-1 |
Integer |
设置所有要使用并行实例运行的操作符(如聚合、连接、过滤器)的默认并行性。此配置比流线型执行环境的并行性具有更高的优先级(实际上,此配置覆盖了流线型执行环境的并行性)。值为-1表示没有设置默认的并行性,然后它将返回以使用流执行环境的并行性。 |
table.exec.sink.not-null-enforcer
|
ERROR |
Enum Possible values: [ERROR, DROP] |
The NOT NULL column constraint on a table enforces that null values can't be inserted into the table. Flink supports 'error' (default) and 'drop' enforcement behavior. By default, Flink will check values and throw runtime exception when null values writing into NOT NULL columns. Users can change the behavior to 'drop' to silently drop such records without throwing exception. 表上的NOTNULL列约束强制执行不能将空值插入到表中。Flink支持“错误”(默认)和“删除”强制执行行为。默认情况下,当Nunl值写入NULL列时,Flink列将检查值并抛出运行时异常。用户可以将行为更改为“删除”,以无声地删除这些记录而不抛出异常。 |
table.exec.source.cdc-events-duplicate
|
false |
Boolean |
Indicates whether the CDC (Change Data Capture) sources in the job will produce duplicate change events that requires the framework to deduplicate and get consistent result. CDC source refers to the source that produces full change events, including INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE, for example Kafka source with Debezium format. The value of this configuration is false by default. 指示作业中的CDC(更改数据捕获)源是否会产生重复的更改事件,这需要框架删除重复数据并获得一致的结果。CDC源是指产生完整更改事件的源,包括插入/UPDATE_BEFORE/UPDATE_AFTER/删除,例如使用Debezium格式的Kafka源。此配置的值在默认情况下为假的。但是,很常见的情况是,有重复的更改事件。因为通常CDC工具(例如Debezium)在故障转移发生时至少可以交付一次。因此,在异常情况下,Debezium可能会将卡夫卡重复的更改事件传递给卡夫卡,Flink将得到重复的事件。这可能会导致Flink查询得到错误的结果或意外的异常。因此,如果CDC工具至少交付一次,建议打开此配置。启用此配置需要在CDC源代码上定义主密钥。主键将用于删除重复的更改事件,并生成规范化的更改日志流,代价是另外一个有状态操作符。 |
table.exec.source.idle-timeout
|
0 ms |
Duration |
当源在超时时间内没有接收到任何元素时,它将被标记为暂时空闲。这允许下游任务推进它们的水印,而不需要在它空闲时等待来自该来源的水印。默认值为0,这意味着未启用检测源闲置。 |
table.exec.state.ttl
|
0 ms |
Duration |
指定保留空闲状态(即未更新的状态)的最小时间间隔。状态在空闲时间之前永远不会被清除,并且将在空闲后的某个时间被清除。默认值永远不会清理该状态。注意:清理状态需要额外的簿记费用。默认值为0,这表示它永远无法清理状态。 |
优化器配置
以下配置可以用于调整查询优化器的行为以获得更好的执行计划。
Key |
Default |
Type |
Description |
table.optimizer.agg-phase-strategy
|
"AUTO" |
String |
Strategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set. AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost. TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate. ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate. 汇总阶段的策略。只能设置自动、TWO_PHASE或ONE_PHASE。自动操作:聚合阶段没有特殊的执行者。是选择两个阶段聚合还是选择一个阶段聚合取决于成本。TWO_PHASE:强制使用具有局部聚合和全局聚合的两个阶段聚合。注意,如果聚合调用不支持进入两个阶段的优化,我们仍然将使用一个阶段的聚合。ONE_PHASE:强制使用只有完全全局聚合的一个阶段聚合。 |
table.optimizer.distinct-agg.split.bucket-num
|
1024 |
Integer |
Configure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key 'hash_code(distinct_key) % BUCKET_NUM' which is used as an additional group key after splitting. 在拆分不同的聚合时配置桶的数量。该数字在第一级聚合中用于计算桶密钥“hash_code(distinct_key)%BUCKET_NUM”,该键在拆分后用作额外的组密钥。 |
table.optimizer.distinct-agg.split.enabled
|
false |
Boolean |
Tells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false. 告诉优化器是否将不同的聚合(例如计数(不同col)、SUM(不同col))划分为两个级别。第一个聚合由一个附加密钥打乱,该键使用distinct_key的散列码和桶数计算。当在不同的聚合中存在数据倾斜时,这种优化非常有用,并提供了扩展工作的能力。默认值为假。 |
table.optimizer.join-reorder-enabled
|
false |
Boolean |
Enables join reorder in optimizer. Default is disabled. 在优化器中启用连接重新排序。默认值已禁用。 |
table.optimizer.reuse-source-enabled
|
true |
Boolean |
When it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true. 当它成立时,优化器将尝试找出重复的表源并重用它们。这只有在table.optimizer.reuse-sub-plan-enabled为真时才能工作。 |
table.optimizer.reuse-sub-plan-enabled
|
true |
Boolean |
When it is true, the optimizer will try to find out duplicated sub-plans and reuse them. 当它成立时,优化器将尝试找出重复的子计划并重用它们。 |
table.optimizer.source.predicate-pushdown-enabled
|
true |
Boolean |
When it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true. 当它为真时,优化器将把谓词下推到过滤器表源中。默认值为true。 |
Planner 配置
以下配置可以用于调整 planner 的行为。
Key |
Default |
Type |
Description |
table.dynamic-table-options.enabled
|
false |
Boolean |
启用或禁用用于动态指定表选项的选项提示,如果禁用,如果指定了任何选项提示,则将抛出异常 |
table.generated-code.max-length
|
64000 |
Integer |
指定生成的代码将被分为子函数调用的阈值。Java的最大方法长度为64KB。此设置允许更细的粒度 |
table.local-time-zone
|
"default" |
String |
本地时区定义了当前会话时区ID。它用于用本地时间区</代码>转换为/从<代码>时间戳。在内部,具有局部时区的时戳总是在UTC时区中表示。但是,当转换为不包括时区的数据类型时(例如时间戳、TIME或简单的字符串)时,将在转换期间使用会话时区。选项的输入要么是一个缩写,如“PST”,一个全名,如“美国/Los_Angeles”,要么是一个自定义时区id,如“GMT-8:00”。 |
table.sql-dialect
|
"default" |
String |
SQL方言定义了如何解析SQL查询。不同的SQL方言可能支持不同的SQL语法。目前支持的方言有:default and hive |
查询配置
Table API and SQL queries have the same semantics regardless whether their input is a finite set of rows or an unbounded stream of table changes. In many cases, continuous queries on streaming input are able to compute accurate results that are identical to offline computed results. However, for some continuous queries you have to limit the size of the state they are maintaining in order to avoid to run out of storage while ingesting an unbounded stream of input. It depends on the characteristics of the input data and the query itself whether you need to limit the state size and whether and how it affects the accuracy of the computed results.
Flink’s Table API and SQL interface provide parameters to tune the accuracy and resource consumption of continuous queries. The parameters are specified via a TableConfig object, which can be obtained from the TableEnvironment.
表API和SQL查询具有相同的语义,无论它们的输入是有限的行集还是无界的表更改流。在许多情况下,对流输入的连续查询能够计算出与离线计算结果相同的准确结果。但是,对于一些连续的查询,您必须限制它们所保持的状态的大小,以避免在摄入无限的输入流时耗尽存储空间。这取决于输入数据的特征和查询本身,您是否需要限制状态大小,以及它是否以及如何影响计算结果的准确性。
Flink的表API和SQL接口提供了参数来调整连续查询的准确性和资源消耗的情况。参数通过表配置对象指定,该对象可以从表环境中获得。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//从表格环境中获取查询配置nt
TableConfig tConfig = tableEnv.getConfig();
//设置查询参数
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24));
// define query
Table result = ...
// create TableSink
TableSink<Row> sink = ...
// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
new String[]{...}, // field names
new TypeInformation[]{...}, // field types
sink); // table sink
//sink
result.executeInsert("outputTable");
// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class);
状态保留时间
许多查询会在一个或多个关键属性上聚合或连接记录。当在流上执行这样的查询时,连续查询需要收集记录或维护每个键的部分结果。如果输入流的关键域正在演化,即活动键值随时间变化,那么随着观察到越来越多的不同键,连续查询会积累越来越多的状态。然而,键通常在一段时间后变得不活动,它们相应的状态变得陈旧和无用。
例如,以下查询计算每个会话的单击次数。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
控制Id属性被用作分组键,连续查询为它观察到的每个控制Id维护一个计数。会话Id属性随时间变化,会话Id值只在会话结束之前活动,即在有限的时间内。但是,连续查询不能知道会id的这个属性,并期望每个会id值都可以在任何时间点发生。它为每个观察到的会议i值维护一个计数。因此,随着观察到越来越多的控制id值,查询的总状态大小也在不断增长。
空闲状态保留时间参数定义在删除键之前不更新键状态的时间。对于前面的示例查询,一旦控制id的数量没有在配置的期间进行更新,就会被删除。
通过删除键的状态,连续查询完全忘记了它以前看到过这个键。如果处理一个带有键的记录,其状态以前已被删除,则该记录将被视为它是具有相应键的第一个记录。对于上面的例子,这意味着一个会话id的计数将再次从0开始。’
配置空闲状态保留时间有两个参数:
最小空闲状态保留时间定义了在删除非活动密钥之前至少保持其状态的时间。
最大空闲状态保留时间定义了在删除非活动键的状态最多保留的时间。
参数指定如下:
TableConfig tConfig = ...
//设置空闲状态保留时间:最小=12小时,最大=24小时
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24));
分钟时间和最大时间之间的差异必须至少为5分钟。