Flink 问题排查 - 作业部署失败 现象:作业无法正常提交与启动
可能成因 |
确认方法 |
解决措施 |
程序包依赖与集群依赖存在版本冲突 |
日志:NoSuchMethodError/ IncompatibleClassChangeError/ ClassCastException |
1. 程序包中 Flink/Hadoop 相关依赖设为 provided2. 使用 Maven-Shade-Plugin 3. 调整 classloader.resolve-order |
程序包缺少依赖 |
日志:1.NoClassDefFoundError 2.NoMatchingTableFactory 3.Could not instantiate the executor |
程序包中添加 connector 依赖 程序包中添加 planner 依赖 升级 1.11 之后,程序包需要添加 flink-client 依赖 |
Flink Client 缺少依赖 |
Client 日志:Could not build the program from jar file. NoClassDefFoundError: hadoop/jersey |
Flink lib 目录添加 flink-hadoop-shade-uber-jar Flink lib 目录添加 jersey-core-jar |
集群资源不足 |
Client 日志:Deploy took more than 60s Slot allocation request timed out |
扩充集群资源 减少任务并行度 |
Flink 问题排查 - 作业运行异常 现象:作业突然停止运行且不恢复
可能成因 |
确认方法 |
解决措施 |
Source 算子实现方法不正确 |
JM 日志:作业结束于 FINISHED (SUCCEEDED) 状态 |
修改 Source 算子的实现,保持 while true 循环 |
作业重启次数达到阈值 |
TM 日志:restart strategy prevented it |
1. 找出作业崩溃重启原因 2. 增大 RestartStrategy 阈值或者 yarn.application-attempt 阈值 |
JVM 内存用量超出 YARN/K8s 阈值 |
JM 日志:Killing container |
通常因为 RocksDB 内存不受控导致,可升级为 Flink 1.11 以上版本 也可能是用户代码分配了直接内存 |
Flink 问题排查 - 作业处理缓慢 现象:作业输出量较稳定,但是不及预期值(5000 ~ 20000 条/秒/核)
可能成因 |
确认方法 |
解决措施 |
序列化、反序列化开销大 |
指标:CPU 使用率很高 采样:Kryo 等方法占比很高 |
1. 减少不必要的 rebalance 2. 换用更高效的 序列化框架 |
算法时间复杂度高(正则 hashCode) |
指标:CPU 使用率很高 采样:用户自定义方法占比高 |
优化自定义方法的逻辑实现 增加作业并行度 |
数据倾斜 |
指标:某算子的不同子任务, 输入、输出指标相差很大 |
key 打散、rebalance、预聚合 |
低速外部系统 |
指标:CPU 利用率低 采样:外部 IO 耗时长 |
1. 批量存取 2. 本地缓存 3. Async I/O 异步交互 |
Flink 问题排查 - 作业处理缓慢 现象:作业输出量逐步减少,甚至完全无输出
可能成因 |
确认方法 |
解决措施 |
算子背压较高 |
指标:Flink UI 背压采样 显示红色(HIGH) |
可根据 背压分析表 判断瓶颈算子,对其进行调优 |
Full GC 时间长 |
指标:GC 时间增长迅速 日志:GC 日志中 Full GC 频繁 |
1. 增加堆内存上限 2. 优化堆内存使用 |
数据源(Source)输出慢/发生异常 |
新建作业,只消费数据源,不经其他算子,输出到 Blackhole Sink |
如果吞吐量仍然无法上升,则说明数据源有问题 |
数据目的(Sink)写入慢/发生异常 |
使用 Datagen Source,直接输出到数据目的 |
如果吞吐量仍然无法上升,则说明数据目的有问题 |
数据格式异常、逻辑错误 |
日志:存在大量数据异常报错,导致数据被丢掉或者无限重试 |
调整逻辑,过滤异常数据 |
快照太频繁 单个快照过大 堆内存中状态过多 |
指标:Flink UI 查看各个快照的大小以及完成时间。 指标:查看每个 TaskManager 的堆内存用量。 |
1. 如果快照较大,或完成时间较长,考虑减少快照频率,增大超时时间。 2. snapshotState 方法减少同步调用 3. 启用增量快照或非对齐检查点。 4. 如果用到窗口,可以减少窗口大小,增加 Sliding 窗口的移动周期。 5. 如果用到 GROUP BY,设置 Idle State Retention Time. 6. 自定义状态,设置 State TTL. |
Watermark 因异常数据错乱 |
指标:算子的 Watermark 值远大于当前时间戳。 |
找出并过滤时间戳明显异常的数据(例如超过当前时间戳太久的数据) |
Flink 问题排查 - 作业数据异常 现象:少量数据丢失
可能成因 |
确认方法 |
解决措施 |
逻辑(编程)错误 |
日志:观察日志中是否有异常。 采样:关闭 Operator Chaining,然后逐个算子观察指标、采样。 |
修复逻辑问题,从 Savepoint 重新运行作业 |
个别数据格式异常,造成整批次被丢弃 |
同上 |
针对异常数据做容错处理,或暂时关闭批量处理功能。 |
数据源的元数据改变 |
检查是否在运行期间改变了数据源的分区数、表定义等元数据 |
开启运行时分区自动发现 避免在运行期间修改数据源 |
数据目的不接纳某些数据 |
日志:检查数据目的(Sink)的相关报错和异常 |
放宽数据库表的限制。 过滤或补全异常数据。 |
Flink 问题排查 - 作业数据异常 现象:大量数据重新消费
可能成因 |
确认方法 |
解决措施 |
从错误的 Checkpoint/Savepoint 开始消费 |
日志:观察日志中 Kafka 等 Source 的 offset 是否回退 指标:观察数据 Lag 是否飙升 |
选择正确的 Savepoint, 重新运行作业 |
数据源发生异常 |
日志:观察日志中 Kafka 等 Source 是否存在异常 指标:观察 Kafka 各分区 Lag 是否接近 |
手动指定 offset,重新消费 |
Flink 作业性能调优 – 任务资源
性能瓶颈 |
评判指标 |
解决措施 |
任务并行度 |
指标:存在数据 Lag;Operator Subtask 之间数据量差距大 |
设置合适的的任务并行度,一般与 Source 的分区数相同设置优先级:算子>env>命令行>配置文件 |
内存 |
指标:1. 存在数据 Lag 2. 观察 JM/TM Metrics - Memory & GC 存在异常 |
增大内存 手动指定内存各区域比例 |
CPU |
指标:机器 CPU 使用率接近 100%;存在热点方法消耗大量 CPU 时间 |
JStack/JProfiler 找到 CPU 消耗高的方法,进一步处理 |
网络 |
指标:Network Memory Netty Shuffle Buffers |
1.加大 Network Buffer2.开启 Operator Chain,减少数据 shuffle 3.使用大带宽网卡 |
Flink 作业性能调优 – 数据倾斜 优化点 – 数据去重
优化方案 |
优点 |
可能缺陷 |
Flink State + HashMap |
实现简单 |
Hash 冲突导致性能下降 状态变大导致 GC 和 Checkpoint 问题 |
Bit Map/Roaring Bit Map |
精准去重 内存占用较少 |
扩容比较困难 |
Bloom Filter |
内存占用少 |
近似去重 |
Flink 作业性能调优 – 低速外部系统 优化点 – 维表 Join
Flink 作业性能调优 – 大状态 优化清单
优化点 |
注意事项 |
相关配置 |
Checkpoint 设置 |
合理设置 Checkpoint 间隔、停顿时间与超时时间 |
CheckpointInterval MinPauseBetweenCheckpoints |
State TTL + 批量存取 |
设置 State TTL,防止状态无限增长 State 批量存取,减少与状态后端交互 |
详见 StateTtlConfig |
StateBackend 调优 |
超大状态情况下选择 RocksDB StateBackend 并妥善调优 开启增量 Checkpointrocksdb localdir 指定多硬盘分担写入压力 |
详见下页 |
Unaligned checkpoint |
高反压情况下建议使用 State size 可能会有较大增长,导致I/O 增大,作业恢复时间变长 |
execution.checkpointing. unaligned execution.checkpointing. alignment-timeout |
Flink 作业性能调优 – 大状态 RocksDB 参数调优
MemTable 系列参数 Write Buffer Size:控制 MemTable 的阈值。
Write Buffer 越大,写放大效应越小,写性能也会改善。默认大小是 64 MB。可以根据实际情况适当增大。
Write Buffer Count:控制内存中允许保留的 MemTable 最大个数。默认为 2,建议设置到 5 左右。
Block Cache 系列参数
Block Size:增加该配置导致写入性能增强,读取性能下降,需要搭配 Block Cache Size 调整。建议生产环境调整到 16 ~ 32 KB,内存充足可以设为 128 KB。
Block Cache Size:增加 Block 该配置可以明显增加读性能。默认大小为 8 MB,建议设置到 64 ~ 256 MB。
Generic 参数
Max Open Files:决定了 RocksDB 可以打开的最大文件句柄数,默认值是 5000。如果进程的 ulimit 没有限制,建议改为 -1(无限制)。
Index 和 Bloom Filter 系列参数
Cache Index And Filter Blocks:表示是否在内存里缓存索引和过滤器 Block。建议在 Key 具有局部热点时打开。
Optimize Filter For Hits:表示是否会给 L0 生成 Bloom Filter。建议在 Key 具有局部热点时打开。
Flush 和 Compaction 相关参数
Max Bytes For Level Base:表示 L1 层大小阈值。该参数太小,每层能存放的 SSTable 较少,导致层级很多,造成查找困难;该参数太大,每层 SSTable 较多,导致执行 Compaction 等操作的耗时较长,此时容易出现 Write Stall(写停止)现象,造成写入中断。默认值为256MB,建议设为 target_file_size 的倍数。
Max Bytes For Level Multiplier:决定了LSM Tree 每层级的大小阈值的倍数关系。根据实际情况进行调整。
Target File Size:表示上一级的 SST 文件达到多大时触发 Compaction 操作,默认值是 64MB(每增加一级,阈值会自动乘以 target_file_size_multiplier)。 为了减少 Compaction 的频率,生产环境可调整为 128MB 。
Thread Num:表示后台进行 Compaction 和 Flush 操作的线程数。默认为 1,生产环境建议调大为 4 。
问题追因实践 性能分析 – JProfiler 找出热点方法
问题追因实践 性能分析 – Java Flight Recorder 生成火焰图
坚持学习雀食需要强大的意志,屏蔽外界的干扰,就是学,就是卷才能成为至强者