当前位置: 代码迷 >> 综合 >> Flink1.4 Checkpoints
  详细解决方案

Flink1.4 Checkpoints

热度:93   发布时间:2024-01-05 10:19:24.0

Flink1.3

概览

Checkpoints可以拿Flink中的状态具有容错性,它是通过允许恢复状态和流原有的位置,因此使应用程序具有无故障运行语义.

查看Checkpointing来为你的程序开启和配置checkpoints功能.

checkpoint存储

当开启checkpointing,管理状态被持久化来保证任务失败恢复到一致性. checkpoiting过程中持久化状态被保存在哪里是依赖选择什么Checkpoint存储.

可用的Checkpoint存储选项

Flink捆绑提供checkpoint存储类型

  • JobManagerCheckpointStorage
  • FileSystemCheckpointStorage

如果checkpoint目录被配置,使用的是FileSystemCheckpointStorage,否则使用的是JobManagerCheckpointStorage

JobManagerCheckpointStorage

JobManagerCheckpointStorage 存储checkpoint快照 在JobManager的堆中

如果内存超过指定大小 会发送内存溢出,会导致配置的checkpoint失败. 为设置这个特性,用户用相对应最大的大小实例化JobManagerCheckpointStorage .

new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE);

JobManagerCheckpointStorage的限制

  • 每个独立状态的大小默认设置为5MB. 在JobManagerCheckpointStorage构造函数里这个值可以增加.
  • 不顾配置状态大小,状态大小不能大于Akka frame的大小(查看配置)
  • 聚合状态占用大小一定要能在JobManager内存之内.

JobManagerCheckpointStorage 推荐使用

  • 本地开发或是调试
  • 任务使用占用空间非常小的状态,像一条记录一次的函数中(Map, FlatMap,Filter..). Kafka消费者需要非常小的状态.

FileSystemCheckpointStorage

FileSystemCheckpointStorage需要用系统URL(类型,地址,路径),像hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”.

配置checkpointing之后 ,会把状态快照写入配置好的系统文件或目录.最小的元数据需要被存储在Jobmanager的内存中(或是在高可用模式,在元数据checkpoint).

如果checkpoint目录被指定,FileSystemCheckpointStorage 就会被使用,来持久化checkpoint快照.

FileSystemCheckpointStorage 推荐在高可用中设置.

也推荐设置管理内存为0.这个确保大量内存内存分配到用户JVM上.

保存Checkpoint

checkpoints默认是不被保存,只是 使用失败的任务可以从中恢复.当程序被停用时被删除. 你也可以配置周期checkpoint保存起来. 这取决于配置保存checkpoints是否被删除当程序失败或是停用. 这种方式你有一个checkpoin可以恢复当你的任务失败时.

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

 ExternalizedCheckpointCleanup 模式可以配置在你的任务被取消时怎么处理

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:保存checkpoint当任务取消.注意当任务失败你要手动清除这些状态.
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 当任务失败时取消checkpoint.checkpoint状态仅仅可用是任务失败时.

目录结构

和stavepoints类似,checkpoint包含元数据文件和状态后端的依赖和一些额外数据文件.元数据文件和数据文件被存储在目录中,通过设置配置文件中state.checkpoints.dir, 也可设置在不同任务的代码里.

当前checkpoint目录结构(在FLINK-8531被介绍)如下

/user-defined-checkpoint-dir/{job-id}|+ --shared/+ --taskowned/+ --chk-1/+ --chk-2/+ --chk-3/... 

share目录是放状态的,里边可能有多个checkpoints, taskowned也是给状态用的,一定不能被Jobmanager删除, EXCLUSIVE 属于一个检验状态.

checkpoint目录不同 公共APIT,后期可能改变

通过配置文件来配置全局

state.checkpoints.dir: hdfs:///checkpoints/

为每个任务配置checkpoints

env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");

配置存储实例

另外,checkpoint存储可以被设置指定一个checkpoint存储实例,这样允许设置低级别配置,像写入缓存大小 .

env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs:///checkpoints-data/", FILE_SIZE_THESHOLD));

和Savepoint的不同

 checkpoint和savepoint有几点不同 

  • 使用后台 指定(低级别)数据格式,可能会增长.
  • 不支持Flink特性,像扩展

从保存的checkpoint数据恢复

任务恢复是从使用checkpoint就像从savepoint通过使用checkpoint元数据文件替代. (查看savepoint恢复). 注意如果元数据不能自包含,jobmanager需要访问数据文件它的指向:(查看目录结构)

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]