Flink1.3
概览
Checkpoints可以拿Flink中的状态具有容错性,它是通过允许恢复状态和流原有的位置,因此使应用程序具有无故障运行语义.
查看Checkpointing来为你的程序开启和配置checkpoints功能.
checkpoint存储
当开启checkpointing,管理状态被持久化来保证任务失败恢复到一致性. checkpoiting过程中持久化状态被保存在哪里是依赖选择什么Checkpoint存储.
可用的Checkpoint存储选项
Flink捆绑提供checkpoint存储类型
- JobManagerCheckpointStorage
- FileSystemCheckpointStorage
如果checkpoint目录被配置,使用的是
FileSystemCheckpointStorage,否则使用的是
JobManagerCheckpointStorage
JobManagerCheckpointStorage
JobManagerCheckpointStorage 存储checkpoint快照 在JobManager的堆中
如果内存超过指定大小 会发送内存溢出,会导致配置的checkpoint失败. 为设置这个特性,用户用相对应最大的大小实例化JobManagerCheckpointStorage
.
new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);
JobManagerCheckpointStorage的限制
- 每个独立状态的大小默认设置为5MB. 在JobManagerCheckpointStorage构造函数里这个值可以增加.
- 不顾配置状态大小,状态大小不能大于Akka frame的大小(查看配置)
- 聚合状态占用大小一定要能在JobManager内存之内.
JobManagerCheckpointStorage 推荐使用
- 本地开发或是调试
- 任务使用占用空间非常小的状态,像一条记录一次的函数中(Map, FlatMap,Filter..). Kafka消费者需要非常小的状态.
FileSystemCheckpointStorage
FileSystemCheckpointStorage需要用系统URL(类型,地址,路径),像hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”.
配置checkpointing之后 ,会把状态快照写入配置好的系统文件或目录.最小的元数据需要被存储在Jobmanager的内存中(或是在高可用模式,在元数据checkpoint).
如果checkpoint目录被指定,FileSystemCheckpointStorage
就会被使用,来持久化checkpoint快照.
FileSystemCheckpointStorage
推荐在高可用中设置.
也推荐设置管理内存为0.这个确保大量内存内存分配到用户JVM上.
保存Checkpoint
checkpoints默认是不被保存,只是 使用失败的任务可以从中恢复.当程序被停用时被删除. 你也可以配置周期checkpoint保存起来. 这取决于配置保存checkpoints是否被删除当程序失败或是停用. 这种方式你有一个checkpoin可以恢复当你的任务失败时.
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
ExternalizedCheckpointCleanup
模式可以配置在你的任务被取消时怎么处理
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
:保存checkpoint当任务取消.注意当任务失败你要手动清除这些状态.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 当任务失败时取消checkpoint.checkpoint状态仅仅可用是任务失败时.
目录结构
和stavepoints类似,checkpoint包含元数据文件和状态后端的依赖和一些额外数据文件.元数据文件和数据文件被存储在目录中,通过设置配置文件中state.checkpoints.dir, 也可设置在不同任务的代码里.
当前checkpoint目录结构(在FLINK-8531被介绍)如下
/user-defined-checkpoint-dir/{job-id}|+ --shared/+ --taskowned/+ --chk-1/+ --chk-2/+ --chk-3/...
share目录是放状态的,里边可能有多个checkpoints, taskowned也是给状态用的,一定不能被Jobmanager删除, EXCLUSIVE 属于一个检验状态.
checkpoint目录不同 公共APIT,后期可能改变
通过配置文件来配置全局
state.checkpoints.dir: hdfs:///checkpoints/
为每个任务配置checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
配置存储实例
另外,checkpoint存储可以被设置指定一个checkpoint存储实例,这样允许设置低级别配置,像写入缓存大小 .
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs:///checkpoints-data/", FILE_SIZE_THESHOLD));
和Savepoint的不同
checkpoint和savepoint有几点不同
- 使用后台 指定(低级别)数据格式,可能会增长.
- 不支持Flink特性,像扩展
从保存的checkpoint数据恢复
任务恢复是从使用checkpoint就像从savepoint通过使用checkpoint元数据文件替代. (查看savepoint恢复). 注意如果元数据不能自包含,jobmanager需要访问数据文件它的指向:(查看目录结构)
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]