当前位置: 代码迷 >> 综合 >> Flink 1.12 状态与容错
  详细解决方案

Flink 1.12 状态与容错

热度:97   发布时间:2023-09-18 17:01:56.0

概述

Checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。

参考 Checkpointing 查看如何在 Flink 程序中开启和配置 checkpoint。

保留 Checkpoint

Checkpoint 在默认的情况下仅用于恢复失败的作业,并不保留,当程序取消时 checkpoint 就会被删除。当然,你可以通过配置来保留 checkpoint,这些被保留的 checkpoint 在作业失败或取消时不会被清除。这样,你就可以使用该 checkpoint 来恢复失败的作业。

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 配置项定义了当作业取消时,对作业 checkpoint 的操作:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。

目录结构

与 savepoints 相似,checkpoint 由元数据文件、数据文件(与 state backend 相关)组成。可通过配置文件中 “state.checkpoints.dir” 配置项来指定元数据文件和数据文件的存储路径,另外也可以在代码中针对单个作业特别指定该配置项。

当前的 checkpoint 目录结构(由 FLINK-8531 引入)如下所示:

/user-defined-checkpoint-dir/{job-id}|+ --shared/+ --taskowned/+ --chk-1/+ --chk-2/+ --chk-3/...

其中 SHARED 目录保存了可能被多个 checkpoint 引用的文件,TASKOWNED 保存了不会被 JobManager 删除的文件,EXCLUSIVE 则保存那些仅被单个 checkpoint 引用的文件。

注意: Checkpoint 目录不是公共 API 的一部分,因此可能在未来的 Release 中进行改变。

通过配置文件全局配置

state.checkpoints.dir: hdfs:///checkpoints/

创建 state backend 对单个作业进行配置

env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));

Checkpoint 与 Savepoint 的区别

Checkpoint 与 savepoints 有一些区别,体现在 checkpoint :

  • 使用 state backend 特定的数据格式,可能以增量方式存储。
  • 不支持 Flink 的特定功能,比如扩缩容。

从保留的 checkpoint 中恢复状态

与 savepoint 一样,作业可以从 checkpoint 的元数据文件恢复运行(savepoint恢复指南)。注意,如果元数据文件中信息不充分,那么 jobmanager 就需要使用相关的数据文件来恢复作业(参考目录结构)。

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]

什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同?

Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。

注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。

从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。 Checkpoint 的主要目的是为意外失败的作业提供恢复机制。 Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互。 作为一种恢复和定期触发的方法,Checkpoint 实现有两个设计目标:i)轻量级创建和 ii)尽可能快地恢复。 可能会利用某些特定的属性来达到这个,例如, 工作代码在执行尝试之间不会改变。 在用户终止作业后,通常会删除 Checkpoint(除非明确配置为保留的 Checkpoint)。

与此相反、Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。 例如,升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。 当然,Savepoint 必须在作业停止后继续存在。 从概念上讲,Savepoint 的生成,恢复成本可能更高一些,Savepoint 更多地关注可移植性和对前面提到的作业更改的支持。

除去这些概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式。然而,目前有一个例外,我们可能会在未来引入更多的差异。例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。

分配算子 ID

强烈建议你按照本节所述调整你的程序,以便将来能够升级你的程序。主要通过 uid(String) 方法手动指定算子 ID 。这些 ID 将用于恢复每个算子的状态。

DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID

如果不手动指定 ID ,则会自动生成 ID 。只要这些 ID 不变,就可以从 Savepoint 自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID 。

Savepoint 状态

你可以将 Savepoint 想象为每个有状态的算子保存一个映射“算子 ID ->状态”:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。

算子

你可以使用命令行客户端来触发 Savepoint触发 Savepoint 并取消作业从 Savepoint 恢复,以及删除 Savepoint

从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复

触发 Savepoint

当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录(参见:targetDirectory参数来控制该目录的位置。

注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。

以 FsStateBackend 或 RocksDBStateBackend 为例:

# Savepoint 目标目录
/savepoint/# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...

注意: 虽然看起来好像可以移动 Savepoint ,但由于 _metadata 中保存的是绝对路径,因此暂时不支持。 请按照FLINK-5778了解取消此限制的进度。

注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对精确一次的接收器有副作用,为了确保精确一次的语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。

触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

这将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径。 你需要此路径来还原和删除 Savepoint 。

使用 YARN 触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

这将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。

使用 Savepoint 取消作业

$ bin/flink cancel -s [:targetDirectory] :jobId

这将自动触发 ID 为 :jobid 的作业的 Savepoint,并取消该作业。此外,你可以指定一个目标文件系统目录来存储 Savepoint 。该目录需要能被 JobManager(s) 和 TaskManager(s) 访问。

从 Savepoint 恢复

$ bin/flink run -s :savepointPath [:runArgs]

这将提交作业并指定要从中恢复的 Savepoint 。 你可以给出 Savepoint 目录或 _metadata 文件的路径。

跳过无法映射的状态恢复

默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:

$ bin/flink run -s :savepointPath -n [:runArgs]

删除 Savepoint

$ bin/flink savepoint -d :savepointPath

这将删除存储在 :savepointPath 中的 Savepoint。

请注意,还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint(请记住,每个 Savepoint 都是自包含的)。 在 Flink 1.2 之前,使用上面的 Savepoint 命令执行是一个更乏味的任务。

配置

你可以通过 state.savepoint.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。 你可以通过使用触发器命令指定自定义目标目录来覆盖缺省值(请参阅:targetDirectory参数)。

# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。

注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。

F.A.Q

我应该为我作业中的所有算子分配 ID 吗?

根据经验,是的。 严格来说,仅通过 uid 方法给有状态算子分配 ID 就足够了。Savepoint 仅包含这些有状态算子的状态,无状态算子不是 Savepoint 的一部分。

在实践中,建议给所有算子分配 ID,因为 Flink 的一些内置算子(如 Window 算子)也是有状态的,而内置算子是否有状态并不很明显。 如果你完全确定算子是无状态的,则可以跳过 uid 方法。

如果我在作业中添加一个需要状态的新算子,会发生什么?

当你向作业添加新算子时,它将在没有任何状态的情况下进行初始化。 Savepoint 包含每个有状态算子的状态。 无状态算子根本不是 Savepoint 的一部分。 新算子的行为类似于无状态算子。

如果从作业中删除有状态的算子会发生什么?

默认情况下,从 Savepoint 恢复时将尝试将所有状态分配给新作业。如果有状态算子被删除,则无法从 Savepoint 恢复。

你可以通过使用 run 命令设置 --allowNonRestoredState (简称:-n )来允许删除有状态算子:

$ bin/flink run -s :savepointPath -n [:runArgs]

如果我在作业中重新排序有状态算子,会发生什么?

如果给这些算子分配了 ID,它们将像往常一样恢复。

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致你无法从以前的 Savepoint 恢复。

如果我添加、删除或重新排序作业中没有状态的算子,会发生什么?

如果将 ID 分配给有状态操作符,则无状态操作符不会影响 Savepoint 恢复。

如果没有分配 ID ,则有状态操作符自动生成的 ID 很可能在重新排序后发生更改。这将导致你无法从以前的Savepoint 恢复。

当我在恢复时改变程序的并行度时会发生什么?

如果 Savepoint 是用 Flink >= 1.2.0 触发的,并且没有使用像 Checkpointed 这样的不推荐的状态API,那么你可以简单地从 Savepoint 恢复程序并指定新的并行度。

如果你正在从 Flink < 1.2.0 触发的 Savepoint 恢复,或者使用现在已经废弃的 api,那么你首先必须将作业和 Savepoint 迁移到 Flink >= 1.2.0,然后才能更改并行度。参见升级作业和Flink版本指南。

我可以将 savepoint 文件移动到稳定存储上吗?

这个问题的快速答案目前是“否”,因为元数据文件由于技术原因将稳定存储上的文件作为绝对路径引用。 更长的答案是:如果你因某种原因必须移动文件,那么有两个潜在的方法作为解决方法。 首先,更简单但可能更危险,你可以使用编辑器在元数据文件中查找旧路径并将其替换为新路径。 其次,你可以使用这个类 SavepointV2Serializer作为以新路径以编程方式读取,操作和重写元数据文件的起点。

可用的 State Backends

Flink 内置了以下这些开箱即用的 state backends :

  • FsStateBackend
  • RocksDBStateBackend

FsStateBackend

FsStateBackend 需要配置一个文件系统的 URL(类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。

FsStateBackend 将正在运行中的状态数据保存在 TaskManager 的内存中。CheckPoint 时,将状态快照写入到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其写入到 CheckPoint 的元数据文件中)。

FsStateBackend 默认使用异步快照来防止 CheckPoint 写状态时对数据处理造成阻塞。 用户可以在实例化 FsStateBackend 的时候,将相应布尔类型的构造参数设置为 false 来关闭异步快照,例如:

new FsStateBackend(path, false);

FsStateBackend 适用场景:

  • 状态比较大、窗口比较长、key/value 状态比较大的 Job。
  • 所有高可用的场景。

建议同时将 managed memory 设为0,以保证将最大限度的内存分配给 JVM 上的用户代码。

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的 URL (类型、地址、路径),例如:”hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”。

RocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 CheckPoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中。 少量的元数据信息存储到 JobManager 的内存中(高可用模式下,将其存储到 CheckPoint 的元数据文件中)。

RocksDBStateBackend 只支持异步快照。

RocksDBStateBackend 的限制:

  • 由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 重要信息: RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。

RocksDBStateBackend 的适用场景:

  • 状态非常大、窗口非常长、key/value 状态非常大的 Job。
  • 所有高可用的场景。

注意,你可以保留的状态大小仅受磁盘空间的限制。与状态存储在内存中的 FsStateBackend 相比,RocksDBStateBackend 允许存储非常大的状态。 然而,这也意味着使用 RocksDBStateBackend 将会使应用程序的最大吞吐量降低。 所有的读写都必须序列化、反序列化操作,这个比基于堆内存的 state backend 的效率要低很多。

请同时参考 Task Executor 内存配置 中关于 RocksDBStateBackend 的建议。

RocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Backend (见 这里)。

可以使用一些 RocksDB 的本地指标(metrics),但默认是关闭的。你能在 这里 找到关于 RocksDB 本地指标的文档。

The total memory amount of RocksDB instance(s) per slot can also be bounded, please refer to documentation here for details.

Choose The Right State Backend

目前,Flink的保存点二进制格式是特定于状态后端的。使用另一个状态后端获取的保存点不能使用另一个后端恢复,并且您应该仔细考虑在生产之前使用哪一个后端。

在决定FSStateBend和RocksDB之间的选择时,需要在性能和可伸缩性之间进行选择。fsstatebend速度非常快,因为每个状态访问和更新都对Java堆上的对象进行操作;但是,状态大小受集群内可用内存的限制。另一方面,RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。但是,每个状态访问和更新都需要(反)序列化并可能从磁盘读取数据,这会导致平均性能比内存状态后端慢一个数量级。

设置 State Backend

如果没有明确指定,将使用 jobmanager 做为默认的 state backend。你能在 flink-conf.yaml 中为所有 Job 设置其他默认的 State Backend。 每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置,如下所示:

设置每个 Job 的 State Backend

StreamExecutionEnvironment 可以对每个 Job 的 State Backend 进行设置,如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果你想在 IDE 中使用 RocksDBStateBackend,或者需要在作业中通过编程方式动态配置 RocksDBStateBackend,必须添加以下依赖到 Flink 项目中。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.12.3</version><scope>provided</scope>
</dependency>

注意: 由于 RocksDB 是 Flink 默认分发包的一部分,所以如果你没在代码中使用 RocksDB,则不需要添加此依赖。而且可以在 flink-conf.yaml 文件中通过 state.backend 配置 State Backend,以及更多的 checkpointing 和 RocksDB 特定的 参数。

设置默认的(全局的) State Backend

在 flink-conf.yaml 可以通过键 state.backend 设置默认的 State Backend。

可选值包括 filesystem (FsStateBackend)、rocksdb (RocksDBStateBackend), 或使用实现了 state backend 工厂 StateBackendFactory 的类的全限定类名, 例如: RocksDBStateBackend 对应为 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

state.checkpoints.dir 选项指定了所有 State Backend 写 CheckPoint 数据和写元数据文件的目录。 你能在 这里 找到关于 CheckPoint 目录结构的详细信息。

配置文件的部分示例如下所示:

# 用于存储 operator state 快照的 State Backendstate.backend: filesystem# 存储快照的目录state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

RocksDB State Backend 进阶

增量快照

RocksDBStateBackend 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。

一个增量快照是基于(通常多个)前序快照构建的。由于 RocksDB 内部存在 compaction 机制对 sst 文件进行合并,Flink 的增量快照也会定期重新设立起点(rebase),因此增量链条不会一直增长,旧快照包含的文件也会逐渐过期并被自动清理。

和基于全量快照的恢复时间相比,如果网络带宽是瓶颈,那么基于增量快照恢复可能会消耗更多时间,因为增量快照包含的 sst 文件之间可能存在数据重叠导致需要下载的数据量变大;而当 CPU 或者 IO 是瓶颈的时候,基于增量快照恢复会更快,因为从增量快照恢复不需要解析 Flink 的统一快照格式来重建本地的 RocksDB 数据表,而是可以直接基于 sst 文件加载。

虽然状态数据量很大时我们推荐使用增量快照,但这并不是默认的快照机制,您需要通过下述配置手动开启该功能:

  • 在 flink-conf.yaml 中设置:state.backend.incremental: true 或者
  • 在代码中按照右侧方式配置(来覆盖默认配置):RocksDBStateBackend backend = new RocksDBStateBackend(filebackend, true);

需要注意的是,一旦启用了增量快照,网页上展示的 Checkpointed Data Size 只代表增量上传的数据量,而不是一次快照的完整数据量。

内存管理

Flink 致力于控制整个进程的内存消耗,以确保 Flink 任务管理器(TaskManager)有良好的内存使用,从而既不会在容器(Docker/Kubernetes, Yarn等)环境中由于内存超用被杀掉,也不会因为内存利用率过低导致不必要的数据落盘或是缓存命中率下降,致使性能下降。

为了达到上述目标,Flink 默认将 RocksDB 的可用内存配置为任务管理器的单槽(per-slot)托管内存量。这将为大多数应用程序提供良好的开箱即用体验,即大多数应用程序不需要调整 RocksDB 配置,简单的增加 Flink 的托管内存即可改善内存相关性能问题。

当然,您也可以选择不使用 Flink 自带的内存管理,而是手动为 RocksDB 的每个列族(ColumnFamily)分配内存(每个算子的每个 state 都对应一个列族)。这为专业用户提供了对 RocksDB 进行更细粒度控制的途径,但同时也意味着用户需要自行保证总内存消耗不会超过(尤其是容器)环境的限制。请参阅 large state tuning 了解有关大状态数据性能调优的一些指导原则。

RocksDB 使用托管内存

这个功能默认打开,并且可以通过 state.backend.rocksdb.memory.managed 配置项控制。

Flink 并不直接控制 RocksDB 的 native 内存分配,而是通过配置 RocksDB 来确保其使用的内存正好与 Flink 的托管内存预算相同。这是在任务槽(per-slot)级别上完成的(托管内存以任务槽为粒度计算)。

为了设置 RocksDB 实例的总内存使用量,Flink 对同一个任务槽上的所有 RocksDB 实例使用共享的 cache 以及 write buffer manager。 共享 cache 将对 RocksDB 中内存消耗的三个主要来源(块缓存、索引和bloom过滤器、MemTables)设置上限。

Flink还提供了两个参数来控制写路径(MemTable)和读路径(索引及过滤器,读缓存)之间的内存分配。当您看到 RocksDB 由于缺少写缓冲内存(频繁刷新)或读缓存未命中而性能不佳时,可以使用这些参数调整读写间的内存分配。

  • state.backend.rocksdb.memory.write-buffer-ratio,默认值 0.5,即 50% 的给定内存会分配给写缓冲区使用。
  • state.backend.rocksdb.memory.high-prio-pool-ratio,默认值 0.1,即 10% 的 block cache 内存会优先分配给索引及过滤器。 我们强烈建议不要将此值设置为零,以防止索引和过滤器被频繁踢出缓存而导致性能问题。此外,我们默认将L0级的过滤器和索引将被固定到缓存中以提高性能,更多详细信息请参阅 RocksDB 文档。

注意 上述机制开启时将覆盖用户在 PredefinedOptions 和 RocksDBOptionsFactory 中对 block cache 和 write buffer 进行的配置。

注意 仅面向专业用户:若要手动控制内存,可以将 state.backend.rocksdb.memory.managed 设置为 false,并通过 ColumnFamilyOptions 配置 RocksDB。 或者可以复用上述 cache/write-buffer-manager 机制,但将内存大小设置为与 Flink 的托管内存大小无关的固定大小(通过 state.backend.rocksdb.memory.fixed-per-slot 选项)。 注意在这两种情况下,用户都需要确保在 JVM 之外有足够的内存可供 RocksDB 使用。

计时器(内存 vs. RocksDB)

计时器(Timer)用于安排稍后的操作(基于事件时间或处理时间),例如触发窗口或回调 ProcessFunction

当选择 RocksDBStateBackend 时,默认情况下计时器也存储在 RocksDB 中。这是一种健壮且可扩展的方式,允许应用程序使用很多个计时器。另一方面,在 RocksDB 中维护计时器会有一定的成本,因此 Flink 也提供了将计时器存储在 JVM 堆上而使用 RocksDB 存储其他状态的选项。当计时器数量较少时,基于堆的计时器可以有更好的性能。

您可以通过将 state.backend.rocksdb.timer-service.factory 配置项设置为 heap(而不是默认的 rocksdb)来将计时器存储在堆上。

注意 在 RocksDBStateBackend 中使用基于堆的计时器的组合当前不支持计时器状态的异步快照。其他状态(如 keyed state)可以被异步快照。

开启 RocksDB 原生监控指标

您可以选择使用 Flink 的监控指标系统来汇报 RocksDB 的原生指标,并且可以选择性的指定特定指标进行汇报。 请参阅 configuration docs 了解更多详情。

注意: 启用 RocksDB 的原生指标可能会对应用程序的性能产生负面影响。

列族(ColumnFamily)级别的预定义选项

注意 在引入 RocksDB 使用托管内存 功能后,此机制应限于在专家调优故障处理中使用。

使用预定义选项,用户可以在每个 RocksDB 列族上应用一些预定义的配置,例如配置内存使用、线程、Compaction 设置等。目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。

有两种方法可以选择要应用的预定义选项:

  • 通过 state.backend.rocksdb.predefined-options 配置项将选项名称设置进 flink-conf.yaml 。
  • 通过程序设置:RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM) 。

该选项的默认值是 DEFAULT ,对应 PredefinedOptions.DEFAULT 。

通过 RocksDBOptionsFactory 配置 RocksDB 选项

注意 在引入 RocksDB 使用托管内存 功能后,此机制应限于在专家调优故障处理中使用。

您也可以通过配置一个 RocksDBOptionsFactory 来手动控制 RocksDB 的选项。此机制使您可以对列族的设置进行细粒度控制,例如内存使用、线程、Compaction 设置等。目前每个算子的每个状态都在 RocksDB 中有专门的一个列族存储。

有两种方法可以将 RocksDBOptionsFactory 传递给 RocksDBStateBackend:

  • 通过 state.backend.rocksdb.options-factory 选项将工厂实现类的名称设置到flink-conf.yaml 。

  • 通过程序设置,例如 RocksDBStateBackend.setRocksDBOptions(new MyOptionsFactory()); 。

注意 通过程序设置的 RocksDBOptionsFactory 将覆盖 flink-conf.yaml 配置文件的设置,且 RocksDBOptionsFactory 设置的优先级高于预定义选项(PredefinedOptions)。

注意 RocksDB是一个本地库,它直接从进程分配内存, 而不是从JVM分配内存。分配给 RocksDB 的任何内存都必须被考虑在内,通常需要将这部分内存从任务管理器(TaskManager)的JVM堆中减去。 不这样做可能会导致JVM进程由于分配的内存超过申请值而被 YARN/Mesos 等资源管理框架终止。

从 flink-conf.yaml 中读取列族选项

一个实现了 ConfigurableRocksDBOptionsFactory 接口的 RocksDBOptionsFactory 可以直接从配置文件(flink-conf.yaml)中读取设定。

state.backend.rocksdb.options-factory 的默认配置是 org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory,它默认会将 这里定义 的所有配置项全部加载。 因此您可以简单的通过关闭 RocksDB 使用托管内存的功能并将需要的设置选项加入配置文件来配置底层的列族选项。

下面是自定义 ConfigurableRocksDBOptionsFactory 的一个示例 (开发完成后,请将您的实现类全名设置到 state.backend.rocksdb.options-factory).

public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {private static final long DEFAULT_SIZE = 256 * 1024 * 1024;  // 256 MBprivate long blockCacheSize = DEFAULT_SIZE;@Overridepublic DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {return currentOptions.setIncreaseParallelism(4).setUseFsync(false);}@Overridepublic ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {return currentOptions.setTableFormatConfig(new BlockBasedTableConfig().setBlockCacheSize(blockCacheSize).setBlockSize(128 * 1024));            // 128 KB}@Overridepublic RocksDBOptionsFactory configure(Configuration configuration) {this.blockCacheSize =configuration.getLong("my.custom.rocksdb.block.cache.size", DEFAULT_SIZE);return this;}
}

Flink 应用要大规模可靠运行,必须满足两个条件:

  • 应用程序需要能够可靠地获取检查点

  • 资源需要有足够的资源在失败后赶上输入数据流

第一部分讨论如何大规模获得性能良好的检查点。最后一部分解释了有关规划要使用多少资源的一些最佳实践。

监控状态和检查点

监视检查点行为的最简单方法是通过 UI 的检查点部分。检查点监控文档展示了如何访问可用的检查点指标。

扩展检查点时特别感兴趣的两个数字(均通过任务级别指标 和Web 界面公开)是:

  • 操作员收到第一个检查点屏障的时间 当触发检查点的时间一直很高时,这意味着检查点屏障从源头到操作员需要很长时间。这通常表明系统在恒定背压下运行。

  • 对齐持续时间,定义为接收第一个和最后一个检查点屏障之间的时间。在未对齐的exactly-once检查点和at-least-once检查点期间,子任务正在处理来自上游子任务的所有数据,没有任何中断。然而,对于对齐的exatcly-once检查点,已经收到检查点屏障的通道被阻止发送更多数据,直到所有剩余的通道都赶上并接收到它们的检查点屏障(对齐时间)。

理想情况下,这两个值都应该较低 - 较高的数量意味着由于一些背压(没有足够的资源来处理传入的记录),检查点障碍会缓慢地通过作业图。这也可以通过增加的处理记录的端到端延迟来观察。请注意,在存在瞬态背压、数据倾斜或网络问题的情况下,这些数字有时会很高。

未对齐的检查点可用于加快检查点障碍的传播时间。但是请注意,这并不能解决首先导致背压的潜在问题(并且端到端记录延迟将保持很高)。

调优检查点

检查点按应用程序可以配置的固定时间间隔触发。当一个检查点的完成时间超过检查点间隔时,在进行中的检查点完成之前不会触发下一个检查点。默认情况下,一旦正在进行的检查点完成,将立即触发下一个检查点。

当检查点经常花费比基本间隔更长的时间(例如,因为状态增长大于计划,或者检查点存储的存储空间暂时缓慢),系统会不断获取检查点(一旦完成,新的会立即启动) . 这可能意味着在检查点中不断占用太多资源,并且操作员取得的进展太少。此行为对使用异步检查点状态的流应用程序影响较小,但仍可能对整体应用程序性能产生影响。

为了防止这种情况,应用程序可以定义检查点之间最短持续时间

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

此持续时间是最新检查点结束与下一个检查点开始之间必须经过的最小时间间隔。下图说明了这如何影响检查点。

Flink 1.12 状态与容错

注意:可以(通过CheckpointConfig)配置应用程序以允许同时进行多个检查点。对于 Flink 中具有大状态的应用程序,这通常会在检查点中占用太多资源。当手动触发保存点时,它可能与正在进行的检查点同时进行。

调整 RocksDB

许多大型 Flink 流应用程序的状态存储主力是RocksDB 状态后端。后端远远超出主内存并可靠地存储大型键控状态。

RocksDB 的性能可能因配置而异,本节概述了一些使用 RocksDB 状态后端调优作业的最佳实践。

增量检查点

当谈到减少检查点花费的时间时,激活增量检查点应该是首要考虑因素之一。与完整检查点相比,增量检查点可以显着减少检查点时间,因为增量检查点仅记录与之前完成的检查点相比的更改,而不是生成状态后端的完整、自包含备份。

有关更多背景信息,请参阅RocksDB 中的增量检查点。

RocksDB 或 JVM 堆中的计时器

计时器默认存储在 RocksDB 中,这是更健壮和可扩展的选择。

当性能调整作业只有很少的计时器(没有窗口,不在 ProcessFunction 中使用计时器)时,将这些计时器放在堆上可以提高性能。请谨慎使用此功能,因为基于堆的计时器可能会增加检查点时间,并且自然无法扩展到内存之外。

有关如何配置基于堆的计时器的详细信息,请参阅本节。

调整 RocksDB 内存

RocksDB 状态后端的性能很大程度上取决于它可用的内存量。为了提高性能,增加内存可以有很大帮助,或者调整内存使用的功能。

默认情况下,RocksDB 状态后端使用 Flink 的管理内存预算来管理 RocksDBs 缓冲区和缓存 ( state.backend.rocksdb.memory.managed: true)。请参阅RocksDB 内存管理以了解该机制如何工作的背景知识。

要调整与内存相关的性能问题,以下步骤可能会有所帮助:

  • 尝试提高性能的第一步应该是增加托管内存的数量。这通常会大大改善这种情况,而不会增加调整低级 RocksDB 选项的复杂性。

    特别是对于大容器/进程大小,总内存中的大部分通常可以转到 RocksDB,除非应用程序逻辑本身需要大量 JVM 堆。默认托管内存分数(0.4)是保守的,并且在使用具有多 GB 进程大小的 TaskManager 时通常可以增加。

  • RocksDB 中写入缓冲区的数量取决于您在应用程序中的状态数量(管道中所有操作符的状态)。每个状态对应一个 ColumnFamily,它需要自己的写缓冲区。因此,具有许多状态的应用程序通常需要更多内存才能获得相同的性能。

  • 您可以通过设置state.backend.rocksdb.memory.managed: false. 尤其是针对基线进行测试(假设没有或宽松的容器内存限制)或与早期版本的 Flink 相比测试回归,这可能很有用。

    与托管内存设置(常量内存池)相比,不使用托管内存意味着 RocksDB 分配的内存与应用程序中的状态数量成正比(内存占用随应用程序变化而变化)。根据经验,非托管模式的上限(除非应用了 ColumnFamily 选项)大约为“140MB * num-states-across-all-tasks * num-slots”。计时器也算作状态!

  • 如果您的应用程序有许多状态并且您看到频繁的 MemTable 刷新(写入端瓶颈),但您无法提供更多内存,则可以增加进入写入缓冲区的内存比率 ( state.backend.rocksdb.memory.write-buffer-ratio)。有关详细信息,请参阅RocksDB 内存管理。

  • 在具有许多状态的设置中减少 MemTable 刷新次数的高级选项(专家模式)是通过以下方式调整 RocksDB 的 ColumnFamily 选项(arena 块大小、最大后台刷新线程等)RocksDBOptionsFactory

public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
      @Overridepublic DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
      // increase the max background flush threads when we have many states in one operator,// which means we would have many column families in one DB instance.return currentOptions.setMaxBackgroundFlushes(4);}@Overridepublic ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
      // decrease the arena block size from default 8MB to 1MB. return currentOptions.setArenaBlockSize(1024 * 1024);}@Overridepublic OptionsFactory configure(Configuration configuration) {
      return this;} }

容量规划

本节讨论如何决定应该使用多少资源来让 Flink 作业可靠运行。容量规划的基本经验法则是:

  • 正常运行应该有足够的容量,不会在恒定背压下运行。有关如何检查应用程序是否在背压下运行的详细信息,请参阅背压监控。

  • 在无故障期间无背压运行程序所需的资源之上,提供一些额外的资源。需要这些资源来“赶上”应用程序恢复期间积累的输入数据。这应该取决于恢复操作通常需要多长时间(这取决于需要在故障转移时加载到新 TaskManager 的状态的大小)以及场景需要故障恢复的速度。

    重要提示:基线应该在检查点激活的情况下建立,因为检查点会占用一定数量的资源(例如网络带宽)。

  • 临时背压通常是可以的,并且是负载高峰期间、追赶阶段或外部系统(写入接收器)表现出暂时减速时执行流控制的重要组成部分。

  • 某些操作(如大窗口)会导致其下游操作符的负载激增:对于窗口,下游操作符在构建窗口时可能没有什么可做的,而在发出窗口时却有负载要做。下游并行性的规划需要考虑窗口发出的数量以及处理此类尖峰所需的速度。

重要提示:为了以后可以添加资源,请务必将数据流程序的最大并行度设置为合理的数值。最大并行度定义了在重新缩放程序(通过保存点)时可以设置程序并行度的高度。

Flink 的内部簿记以 max-parallelism-many key group的粒度跟踪并行状态。Flink 的设计力求使最大并行度具有非常高的值变得高效,即使以低并行度执行程序。

压缩

Flink 为所有检查点和保存点提供可选的压缩(默认:关闭)。目前,压缩始终使用snappy 压缩算法(版本 1.1.4),但我们计划在未来支持自定义压缩算法。压缩对键控状态下的密钥组的粒度起作用,即每个密钥组可以单独解压缩,这对于重新缩放很重要。

可以通过以下方式激活压缩ExecutionConfig

ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setUseSnapshotCompression(true);

注意压缩选项对增量快照没有影响,因为它们使用的是 RocksDB 的内部格式,该格式始终使用开箱即用的 snappy 压缩。

任务本地恢复

动机

在 Flink 的检查点中,每个任务都会生成其状态的快照,然后将其写入分布式存储。每个任务通过发送一个描述状态在分布式存储中的位置的句柄来向作业管理器确认状态的成功写入。反过来,作业管理器从所有任务中收集句柄并将它们捆绑到一个检查点对象中。

在恢复的情况下,作业管理器打开最新的检查点对象并将句柄发送回相应的任务,然后可以从分布式存储中恢复它们的状态。使用分布式存储来存储状态有两个重要的优势。首先,存储是容错的,其次,所有节点都可以访问分布式存储中的所有状态,并且可以轻松地重新分配(例如,用于重新缩放)。

然而,使用远程分布式存储也有一个很大的缺点:所有任务都必须通过网络从远程位置读取它们的状态。在很多场景下,recovery 可以将失败的任务重新调度到和之前运行一样的任务管理器上(当然也有机器故障等例外),但是我们还是要读取远程状态。这可能导致大型状态的恢复时间过长,即使单台机器上只有一个小故障。

方法

任务本地状态恢复正是针对这个恢复时间长的问题,主要思想是:对于每个检查点,每个任务不仅将任务状态写入分布式存储,而且还在一个状态快照的二级副本中保存任务本地的存储(例如在本地磁盘或内存中)。请注意,快照的主存储必须仍然是分布式存储,因为本地存储不能确保节点故障下的持久性,也不能为其他节点提供重新分发状态的访问权限,因此该功能仍然需要主副本。

但是,对于可以重新调度到先前位置进行恢复的每个任务,我们可以从辅助的本地副本恢复状态,并避免远程读取状态的成本。鉴于许多故障不是节点故障,并且节点故障通常一次只影响一个或极少数节点,在恢复中,很可能大多数任务可以返回到它们之前的位置并找到它们的本地状态完好无损。这就是使本地恢复有效减少恢复时间的原因。

请注意,根据所选的状态后端和检查点策略,创建和存储辅助本地状态副本的每个检查点可能会产生一些额外的成本。例如,在大多数情况下,实现将简单地将分布式存储的写入复制到本地文件。

Flink 1.12 状态与容错

 

主(分布式存储)和辅助(任务本地)状态快照的关系

任务本地状态始终被视为次要副本,检查点状态的基本事实是分布式存储中的主要副本。这对检查点和恢复期间的本地状态问题有影响:

  • 对于检查点,主副本必须成功,生成次要本地副本失败不会使检查点失败。如果无法创建主副本,即使成功创建了辅助副本,检查点也会失败。

  • 只有主副本由作业管理器确认和管理,辅助副本由任务管理器拥有,并且它们的生命周期可以独立于它们的主副本。例如,可以保留 3 个最新检查点的历史记录作为主要副本,并且仅保留最新检查点的任务本地状态。

  • 对于恢复,如果有匹配的辅助副本可用,Flink 将始终首先尝试从任务本地状态恢复。如果从副副本恢复过程中出现任何问题,Flink 会透明地重试从主副本恢复任务。仅当主要副本和(可选)次要副本失败时,恢复才会失败。在这种情况下,根据配置 Flink 仍然可以回退到较旧的检查点。

  • 任务本地副本可能只包含完整任务状态的一部分(例如,写入一个本地文件时的异常)。在这种情况下,Flink 将首先尝试在本地恢复本地部分,从主副本恢复非本地状态。主状态必须始终是完整的,并且是任务本地状态超集

  • 任务本地状态可以具有与主状态不同的格式,它们不需要字节相同。例如,甚至有可能任务本地状态是由堆对象组成的内存中,而不存储在任何文件中。

  • 如果任务管理器丢失,则其所有任务的本地状态都将丢失。

配置任务本地恢复

任务本地恢复默认禁用的,可以通过 Flink 的配置使用state.backend.local-recovery中指定的密钥激活CheckpointingOptions.LOCAL_RECOVERY。此设置的值可以是true以启用或false(默认)以禁用本地恢复。

有关不同状态后端的任务本地恢复的详细信息

限制:目前,任务本地恢复仅涵盖键控状态后端。键控状态通常是状态的最大部分。在不久的将来,我们还将介绍操作员状态和计时器。

以下状态后端可以支持任务本地恢复。

  • FsStateBackend:键控状态支持任务本地恢复。该实现会将状态复制到本地文件。这会引入额外的写入成本并占用本地磁盘空间。将来,我们可能还会提供一种将任务本地状态保存在内存中的实现。

  • RocksDBStateBackend:键控状态支持任务本地恢复。对于完整检查点,状态被复制到本地文件。这会引入额外的写入成本并占用本地磁盘空间。对于增量快照,本地状态基于 RocksDB 的原生检查点机制。此机制也用作创建主副本的第一步,这意味着在这种情况下不会为创建辅助副本引入额外成本。我们只是保留本地检查点目录,而不是在上传到分布式存储后将其删除。这个本地副本可以与 RocksDB 的工作目录共享活动文件(通过硬链接),因此对于活动文件,使用增量快照进行任务本地恢复也不会消耗额外的磁盘空间。使用硬链接也意味着 RocksDB 目录必须与可用于存储本地状态的所有配置本地恢复目录位于同一物理设备上,否则建立硬链接可能会失败(参见 FLINK-10954)。目前,

分配保留调度

任务本地恢复假设故障下的分配保留任务调度,其工作原理如下。每个任务都会记住它之前的分配并请求完全相同的插槽在恢复中重新启动。如果此插槽不可用,则任务将从资源管理器请求新的新插槽。这样,如果任务管理器不再可用,则无法返回其先前位置的任务将不会将其他正在恢复的任务赶出其先前的插槽。我们的理由是,只有当任务管理器不再可用时,前一个插槽才会消失,在这种情况下,某些无论如何,任务都必须请求一个新的插槽。通过我们的调度策略,我们为最大数量的任务提供了从其本地状态中恢复的机会,并避免了任务从彼此窃取先前插槽的级联效应。

  相关解决方案