1.配置及注释
1.1定位
flink-conf.yaml是flink的配置文件,优先级在代码和提交参数之后是通用配置
1.2注意点
flink-conf.yaml中配置key/value时候在“:”后面需要有一个空格,否则配置不会生效。
1.3参数解析
#==============================================================================
# 集群通用配置
#==============================================================================
# Jobmanager的地址,taskmanager必须要识别并能连上。
# 只有standalone的集群模式起作用,当执行bin/jobmanager.sh --host<hostname>的时候将被覆盖
# 在YARN或者Mesos的集群模式下将自动替换为jobmanager所在节点的hostname.
jobmanager.rpc.address:
# jobmanager的RPC端口,和taskmanger通信端口
jobmanager.rpc.port: 6123
# JobManager能够使用的内存的最大值,包括Java虚拟机的metaspace和其他的开销;
jobmanager.memory.process.size: 1g
# taskmanager 能够使用的最大内存大小,包括Java虚拟机的metaspace和其他的开销;
taskmanager.memory.process.size: 4g
# 每个TaskManager提供的slot数量,每个slot上能运行一个并行的pipeline任务。
taskmanager.numberOfTaskSlots: 4
# 没有指定并行度的情况下,默认的全局并行度
parallelism.default: 8
#==============================================================================
# WEB配置
#==============================================================================
# Web的运行监视器端扣
#web.port: 8088
# REST客户端将连接到的地址
rest.port:
# 要绑定到的REST和web服务器的端口范围。
rest.bind-port:
# web的作业提交
web.submit.enable: true
#==============================================================================
# HA配置
#==============================================================================
yarn.application-attempts: 10
# 高可用模型的选择有:'NONE' 或者 'zookeeper'
high-availability: zookeeper
# 下面这个参数是指定master恢复所需要的元数据存储的路径。这个路径需要存储像持久化数据流图一样的大对象
# 改参数的指必须是所有节点可持久可访问的路径(e.g. HDFS, S3, Ceph, nfs, ...)
high-availability.storageDir: hdfs://:8020/flink/ha/
# ZooKeeper集群的地址,用于协调高可用模型的建立,而且必须是列表值(e.g. host1:clientPort,host2:clientPort,...)
# 默认的端口客户端端口是 2181
high-availability.zookeeper.quorum: :2181,:2181,:2181
high-availability.zookeeper.path.root: /flink
# 如果zookeeper安全策略开启,那么下面这个值就应该是creator,默认是oepn
#high-availability.zookeeper.client.acl: open
#==============================================================================
# statebackend配置
#==============================================================================
# 用于存储和检查点状态的存储类型:filesystem hdfs rocksdb
state.backend: rocksdb
# 开启增量ck 这里全局生效 用户代码也可设置
state.backend.incremental: true
#RocksDB放置其文件的本地目录(在TaskManager上)
state.backend.rocksdb.localdir:/opt/flink/flink-1.12.4/rocksdb
#==============================================================================
# checkpoints配置
#==============================================================================
# 保存最近的检查点数量
state.checkpoints.num-retained: 20
# checkpoints存储的地址
state.checkpoints.dir: hdfs://:8020/flink/flink-checkpoints
# 默认savepoints存储的地址(可选)
state.savepoints.dir: hdfs://:8020/flink/flink-savepoints
#定期调度检查点的时间间隔
execution.checkpointing.interval: 10min
#最大检查点尝试次数
execution.checkpointing.max-concurrent-checkpoints: 10
#可能同时进行的最大检查点尝试次数
execution.checkpointing.max-concurrent-checkpoints: 1
#检查点模式
execution.checkpointing.mode: EXACTLY_ONCE
#如果启用,当有最近的保存点时,作业恢复应该回落到检查点
execution.checkpointing.prefer-checkpoint-for-recovery: true
#重试次数
execution.checkpointing.tolerable-failed-checkpoints: 10
#非对齐
execution.checkpointing.unaligned: true
#==============================================================================
#restart-strategy配置
#==============================================================================
# 故障恢复的策略,怎么样从失败的任务重恢复计算。配置可选: full 或者 region
# full: 表示集群中的任务发生故障的时候,所有的task重新启动
# region: 表示集群中的任务发生故障的时候,仅重启被失败失败任务所影响的下游的任务和产生数据的上游任务。
jobmanager.execution.failover-strategy: region
# 设置重启策略为failure-rate
restart-strategy: failure-rate
# 失败作业之前的给定时间间隔内的最大重启次数,默认1
restart-strategy.failure-rate.max-failures-per-interval: 10
# 测量故障率的时间间隔。默认1min
restart-strategy.failure-rate.failure-rate-interval: 60min
# 两次连续重启尝试之间的延迟,默认akka.ask.timeout时间
restart-strategy.failure-rate.delay: 6min
#==============================================================================
# 日志配置
#==============================================================================
# HistoryServer 通过bin/historyserver.sh (start|stop)开启和关闭
# 基于Web的HistoryServer的端口号
#historyserver.web.port: 8082
# HistoryServer的服务地址
#historyserver.web.address: 0.0.0.0
# JobManager目录,用于存储已完成作业的档案
jobmanager.archive.fs.dir: hdfs://:8020/flink/completed-jobs/
# 逗号分隔的目录列表,用于从中获取存档的作业。历史记录服务器将监视这些目录中的已存档作业。
# 您可以配置JobManager通过jobmanager.archive.fs.dir将作业归档到目录中。
historyserver.archive.fs.dir: hdfs://:8020/flink/completed-jobs/
# 刷新存档的作业目录的时间间隔(单位:毫秒)
historyserver.archive.fs.refresh-interval: 10000
#==============================================================================
# metric收集信息
#==============================================================================
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host:
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: flink-metrics
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
classloader.check-leaked-classloader: false
#==============================================================================
#优化配置
#==============================================================================
# 类加载顺序的设定,Flink默认是'child-first',还可以选:'parent-first'
# 'child-first'这种加载机制允许用户在自己的应用中使用不同的依赖或者JAR包的版本
classloader.resolve-order: child-first
1.4 Flink高可用配置解释
在运行高可用性YARN群集时,我们不会运行多个JobManager(ApplicationMaster)实例,而只会运行一个,由YARN在失败时重新启动。确切的行为取决于您使用的特定YARN版本。
配置:
最大 Application Master 尝试数量 (yarn-site.xml)
您必须配置为尝试 application masters 的最大数量的 YARN的设置yarn-site.xml:
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
当前YARN版本的默认值为2(表示容忍一个JobManager失败)。
申请尝试次数:
除HA配置,您还必须配置最大尝试次数conf/flink-conf.yaml:
yarn.application-attempts: 10
这意味着在YARN未通过应用程序(9次重试+ 1次初始尝试)之前,应用程序可以重新启动9次以进行失败尝试。如果YARN 算子操作需要,YARN可以执行其他重新启动:抢占,节点硬件故障或重新启动或NodeManager重新同步。这些重启不计入yarn.application-attempts,请参阅Jian Fang的博客文章。重要的是要注意yarn.resourcemanager.am.max-attempts应用程序重新启动的上限。因此,Flink中设置的应用程序尝试次数不能超过启动YARN的YARN群集设置。(yarn.resourcemanager.am.max-attemps).
yarn.application-attempts <= yarn.resourcemanager.am.max-attempts
容器关闭行为
YARN 2.3.0 <版本<2.4.0。如果应用程序主机失败,则重新启动所有容器。
YARN 2.4.0 <版本<2.6.0。TaskManager容器在应用程序主故障期间??保持活动状态。这具有以下优点:启动时间更快并且用户不必再等待再次获得容器资源。
YARN 2.6.0 <=版本:将尝试失败有效性间隔设置为Flinks的Akka超时值。尝试失败有效性间隔表示只有在系统在一个间隔期间看到最大应用程序尝试次数后才会终止应用程序。这避免了持久的工作会耗尽它的应用程序尝试。
注意:Hadoop YARN 2.4.0有一个主要错误(在2.5.0中修复),阻止重新启动的Application Master / Job Manager容器重启容器。有关详细信息,请参阅FLINK-4142。我们建议至少在YARN上使用Hadoop 2.5.0进行高可用性设置。
配置HA模式和 zookeeper.quorum 在conf/flink-conf.yaml:
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: big-data-2:2181,big-data-3:2181,big-data-4:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_yarn_flink
注: 原先hadoop 集群已搭建了zookeeper集群,直接引用即可.