在Flink中每个函数或是操作可以有状态(查看伴随状态详情).状态函数存储数据是通过处理独立的元素或事件, 使操作的任何复杂类型成为状态的核心块.
为了使状态有容错性,Flink需要checkpoint状态. Checkpoints允许Flink恢复状态和流运行的位置,这样给应用无故障运行.
流容错性文档描述Flink技术流处理容错机制细节.
预备知识
Flink checkpointing机制和流存储和状态相互作用.通常,需要:
- 持久化的数据源可以在确定时间回复记录. 例如数据是可以持久化消息队列.(像Kafka,RabbitMQ,Amazon Kinesis, Google PubSub) 或都是文件系统(像HDFS, S3, GFS, NFS, Ceph, …)
- 为状态持久化存储,一般分布式文件系统(像HDFS, S3, GFS, NFS, Ceph, …)
开启和配置Checkpointing
默认checkpointing是禁用的.为开启checkpointing,在StreamExecutionEnvironment调用enableCheckpointing(n), n表示checkpointing时间间隔为毫秒.
其它checkpointing参数包含:
checkpoint存储:你可以设置持久化存储checkpoint快照的位置.l默认Flink使用JobManager堆内存.生产环境推荐使用保存到文件系统.查看checkpoint存储更多详情,可用的配置选项对job和集群.
精准一次和最少一次:可以通过 enableCheckpointing(n)
方法来选择两种保证级别.精准一次更适合大多数应用.至少一次可能是一些确定超低延时的(始终是几毫秒)应用.
checkpoint超时:若在指定时间内checkpoint操作没有完成,则会被停止.
checkpoints间最小的时间间隔:为了保证流应用弄清楚checkpoints之间时间, 一个能定义多长时间需要被需要在两个checkpoints.如果这个值设置为5000,下次checkpoint开始不会最于5秒在上一个checkpoint完成,忽略checkpoint保存和间隔时间. 注意这表示checkpoint的间隔不会小于这个值.
通常相比配置"checkpoint时间时隔"更容易配置两个checkpoint之间时间,因为"checkpoint时间时隔"不容易受影响,checkpoints可能有时平均会花费更长的时间(例如目标存储系统临时运行慢).
注意这个值也受设置checkpoints的并发数影响
checkpoint失败容错次数:定义多少次连续checkpoint失败被容忍,在正job完成失败之前.默认值是0,也意味着没有checkpoint失败是被容忍的, job会失败在checkpoint第一次失败.
checkpoint的并行处理数量:默认,系统不会触发下个checkpoint在上个还没完成.这确保拓扑不会花更多时间在checkpoints,并且不会处理流数据.这样可能允许多个重叠的checkpoint,对于pipelines会有个延时(例如因为函数调用外部服务,需要一些 回复时间),但是仍然需要频繁checkpoints(100ms)来再处理,很少失败.
这个选项不能被用,在checkpoint最少时间间隔被设置时.
扩展的checkpoint:你可以设置 checkpoint周期保存到外部.扩展的checkpoints写元数据到外部存储,并且当job失败时不会清除.这种方法可以用checkpoint来恢复你的job.详细查看扩展的checkpoint部署日记
首选恢复的checkpoint:这决定是否一个任务可以回滚到最近的checkpoint,即使有更多最近savepoint可能减少恢复时间.
无序checkpoints:可以开启无序checkpoint来极大减少checkpoint时间在背压时.只能运行在精准一次并且checkpoints并发数量为1
val env = StreamExecutionEnvironment.getExecutionEnvironment()// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)// advanced options:// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)// only two consecutive checkpoint failures are tolerated
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)// enable externalized checkpoints which are retained
// after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)// enables the experimental unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()// sets the checkpoint storage where checkpoint snapshots will be written
env.getCheckpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")
相关的配置选项
一些参数and/or默认可能被设置通过 conf/flink-conf.yaml(查看所有配置指南)
Key | Default | Type | Description |
---|---|---|---|
state.backend.incremental |
false | Boolean | 可选,后台状态是否创建可增长checkpoints. 对应增长checkpoints与前一个checkpoints的不同是存储,它并不是完整保存checkpoints的完整状态. 一旦开启, 状态的大小可能在Web UI上展示,也可以从rest API获得.一些后端状态可能不支持增长状态,可以忽略. |
state.backend.local-recovery |
false | Boolean | This option configures local recovery for this state backend. By default, local recovery is deactivated. Local recovery currently only covers keyed state backends. Currently, the MemoryStateBackend does not support local recovery and ignores this option. |
state.checkpoint-storage |
(none) | String | The checkpoint storage implementation to be used to checkpoint state. The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory . If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.Recognized shortcut names are 'jobmanager' and 'filesystem'. |
state.checkpoints.dir |
(none) | String | The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). |
state.checkpoints.num-retained |
1 | Integer | The maximum number of completed checkpoints to retain. |
state.savepoints.dir |
(none) | String | The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend). |
state.storage.fs.memory-threshold |
20 kb | MemorySize | The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB. |
state.storage.fs.write-buffer-size |
4096 | Integer | The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option 'state.storage.fs.memory-threshold'. |
taskmanager.state.local.root-dirs |
(none) | String | The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does not support local recovery and ignore this option |
选择checkpoint存储
Flink checkpointing机制用定时器存储连续状态快照和操作状态,包括连接器,窗口操作,用户自定义状态.checkpoing存储(如JobManager内存,文件系统,数据库)位置需要配置Checkpoint存储.
默认checkpoint在Jobmanager内存中存储. 为真正存储大的状态,Flink支持不同的方法来Checkpointing状态到其它位置.checkpoint存储位置是可以通过
StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)
来配置.在生产环境强烈建议checkpoints存储在高可用的文件系统.
点击查看checkpoint存储的更多细节
状态checkpoints在迭代任务
Flink当前只是提供了非迭代处理保证.开启checkpointing在迭代任务会引起异常.为了为迭代任务checkpointing, 用户需要设置一个特殊的标签,当开启checkponting
env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)
请注意在循环边界(并且状态和他们关联改变)运行的记录可能会丢失.