当前位置: 代码迷 >> 综合 >> Flink 常见问题排查与任务调优实践
  详细解决方案

Flink 常见问题排查与任务调优实践

热度:102   发布时间:2023-09-18 16:55:13.0

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践Flink 常见问题排查与任务调优实践

 Flink 问题排查 - 作业部署失败 现象:作业无法正常提交与启动

可能成因

确认方法

解决措施

程序包依赖与集群依赖存在版本冲突

日志:NoSuchMethodError/

IncompatibleClassChangeError/

ClassCastException

1. 程序包中 Flink/Hadoop 相关依赖设为 provided2. 使用 Maven-Shade-Plugin  

3. 调整 classloader.resolve-order

程序包缺少依赖

日志:1.NoClassDefFoundError

2.NoMatchingTableFactory

3.Could not instantiate the executor

程序包中添加 connector 依赖

程序包中添加 planner 依赖

升级 1.11 之后,程序包需要添加 flink-client 依赖

Flink Client 缺少依赖

Client 日志:Could not build the program from jar file.

NoClassDefFoundError: hadoop/jersey

Flink lib 目录添加 flink-hadoop-shade-uber-jar

Flink lib 目录添加 jersey-core-jar

集群资源不足

Client 日志:Deploy took more than 60s

Slot allocation request timed out

扩充集群资源

减少任务并行度

Flink 问题排查 - 作业运行异常 现象:作业突然停止运行且不恢复

可能成因

确认方法

解决措施

Source 算子实现方法不正确

JM 日志:作业结束于 FINISHED (SUCCEEDED) 状态

修改 Source 算子的实现,保持 while true 循环

作业重启次数达到阈值

TM 日志:restart strategy prevented it

1. 找出作业崩溃重启原因

2. 增大 RestartStrategy 阈值或者 yarn.application-attempt 阈值

JVM 内存用量超出 YARN/K8s 阈值

JM 日志:Killing container

通常因为 RocksDB 内存不受控导致,可升级为 Flink 1.11 以上版本

也可能是用户代码分配了直接内存

Flink 问题排查 - 作业处理缓慢 现象:作业输出量较稳定,但是不及预期值(5000 ~ 20000 条/秒/核)

可能成因

确认方法

解决措施

序列化、反序列化开销大

指标:CPU 使用率很高

采样:Kryo 等方法占比很高

1. 减少不必要的 rebalance

2. 换用更高效的 序列化框架

算法时间复杂度高(正则 hashCode)

指标:CPU 使用率很高

采样:用户自定义方法占比高

优化自定义方法的逻辑实现

增加作业并行度

数据倾斜

指标:某算子的不同子任务,          输入、输出指标相差很大

key 打散、rebalance、预聚合

低速外部系统

指标:CPU 利用率低

采样:外部 IO 耗时长

1. 批量存取

2. 本地缓存

3. Async I/O 异步交互

Flink 问题排查 - 作业处理缓慢 现象:作业输出量逐步减少,甚至完全无输出

可能成因

确认方法

解决措施

算子背压较高

指标:Flink UI 背压采样 显示红色(HIGH)

可根据 背压分析表 判断瓶颈算子,对其进行调优

Full GC 时间长

指标:GC 时间增长迅速

日志:GC 日志中 Full GC 频繁

1. 增加堆内存上限

2. 优化堆内存使用

数据源(Source)输出慢/发生异常

新建作业,只消费数据源,不经其他算子,输出到 Blackhole Sink

如果吞吐量仍然无法上升,则说明数据源有问题

数据目的(Sink)写入慢/发生异常

使用 Datagen Source,直接输出到数据目的

如果吞吐量仍然无法上升,则说明数据目的有问题

数据格式异常、逻辑错误

日志:存在大量数据异常报错,导致数据被丢掉或者无限重试

调整逻辑,过滤异常数据

快照太频繁

单个快照过大

堆内存中状态过多

指标:Flink UI 查看各个快照的大小以及完成时间。

指标:查看每个 TaskManager 的堆内存用量。

1. 如果快照较大,或完成时间较长,考虑减少快照频率,增大超时时间。

2. snapshotState 方法减少同步调用

3. 启用增量快照或非对齐检查点。

4. 如果用到窗口,可以减少窗口大小,增加 Sliding 窗口的移动周期。

5. 如果用到 GROUP BY,设置 Idle State Retention Time.

6. 自定义状态,设置 State TTL.

Watermark 因异常数据错乱

指标:算子的 Watermark 值远大于当前时间戳。

找出并过滤时间戳明显异常的数据(例如超过当前时间戳太久的数据)

Flink 问题排查 - 作业数据异常 现象:少量数据丢失

可能成因

确认方法

解决措施

逻辑(编程)错误

日志:观察日志中是否有异常。

采样:关闭 Operator Chaining,然后逐个算子观察指标、采样。

修复逻辑问题,从 Savepoint 重新运行作业

个别数据格式异常,造成整批次被丢弃

同上

针对异常数据做容错处理,或暂时关闭批量处理功能。

数据源的元数据改变

检查是否在运行期间改变了数据源的分区数、表定义等元数据

开启运行时分区自动发现

避免在运行期间修改数据源

数据目的不接纳某些数据

日志:检查数据目的(Sink)的相关报错和异常

放宽数据库表的限制。

过滤或补全异常数据。

Flink 问题排查 - 作业数据异常 现象:大量数据重新消费

可能成因

确认方法

解决措施

从错误的 Checkpoint/Savepoint 开始消费

日志:观察日志中 Kafka 等 Source 的 offset 是否回退

指标:观察数据 Lag 是否飙升

选择正确的 Savepoint,

重新运行作业

数据源发生异常

日志:观察日志中 Kafka 等 Source 是否存在异常

指标:观察 Kafka 各分区 Lag 是否接近

手动指定 offset,重新消费

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

 Flink 作业性能调优 – 任务资源

性能瓶颈

评判指标

解决措施

任务并行度

指标:存在数据 Lag;Operator Subtask 之间数据量差距大

设置合适的的任务并行度,一般与 Source 的分区数相同设置优先级:算子>env>命令行>配置文件

内存

指标:1. 存在数据 Lag

2. 观察 JM/TM Metrics - Memory & GC 存在异常

增大内存

手动指定内存各区域比例

CPU

指标:机器 CPU 使用率接近 100%;存在热点方法消耗大量 CPU 时间

JStack/JProfiler 找到 CPU 消耗高的方法,进一步处理

网络

指标:Network Memory

Netty Shuffle Buffers

1.加大 Network Buffer2.开启 Operator Chain,减少数据 shuffle

3.使用大带宽网卡

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践Flink 常见问题排查与任务调优实践

 Flink 作业性能调优 – 数据倾斜 优化点 – 数据去重

优化方案

优点

可能缺陷

Flink State + HashMap

实现简单

Hash 冲突导致性能下降

状态变大导致 GC 和 Checkpoint 问题

Bit Map/Roaring Bit Map

精准去重

内存占用较少

扩容比较困难

Bloom Filter

内存占用少

近似去重

Flink 常见问题排查与任务调优实践

Flink 作业性能调优 – 低速外部系统 优化点 – 维表 Join 

Flink 常见问题排查与任务调优实践

Flink 作业性能调优 – 大状态 优化清单 

优化点

注意事项

相关配置

Checkpoint 设置

合理设置  Checkpoint 间隔、停顿时间与超时时间

CheckpointInterval

MinPauseBetweenCheckpoints

State TTL + 批量存取

设置 State TTL,防止状态无限增长

State 批量存取,减少与状态后端交互

详见 StateTtlConfig

StateBackend 调优

超大状态情况下选择 RocksDB StateBackend 并妥善调优

开启增量 Checkpointrocksdb localdir 指定多硬盘分担写入压力

详见下页

Unaligned checkpoint

高反压情况下建议使用

State size 可能会有较大增长,导致I/O 增大,作业恢复时间变长

execution.checkpointing.

unaligned

execution.checkpointing.

alignment-timeout

Flink 常见问题排查与任务调优实践

Flink 作业性能调优 – 大状态 RocksDB 参数调优 

MemTable 系列参数 Write Buffer Size:控制 MemTable 的阈值。

Write Buffer 越大,写放大效应越小,写性能也会改善。默认大小是 64 MB。可以根据实际情况适当增大。

Write Buffer Count:控制内存中允许保留的 MemTable 最大个数。默认为 2,建议设置到 5 左右。

Block Cache 系列参数

Block Size:增加该配置导致写入性能增强,读取性能下降,需要搭配 Block Cache Size 调整。建议生产环境调整到 16 ~ 32 KB,内存充足可以设为 128 KB。

Block Cache Size:增加 Block 该配置可以明显增加读性能。默认大小为 8 MB,建议设置到 64 ~ 256 MB。

Generic 参数

Max Open Files:决定了 RocksDB 可以打开的最大文件句柄数,默认值是 5000。如果进程的 ulimit 没有限制,建议改为 -1(无限制)。

Index 和 Bloom Filter 系列参数

Cache Index And Filter Blocks:表示是否在内存里缓存索引和过滤器 Block。建议在 Key 具有局部热点时打开。

Optimize Filter For Hits:表示是否会给 L0 生成 Bloom Filter。建议在 Key 具有局部热点时打开。

Flush 和 Compaction 相关参数

Max Bytes For Level Base:表示 L1 层大小阈值。该参数太小,每层能存放的 SSTable 较少,导致层级很多,造成查找困难;该参数太大,每层 SSTable 较多,导致执行 Compaction 等操作的耗时较长,此时容易出现 Write Stall(写停止)现象,造成写入中断。默认值为256MB,建议设为 target_file_size 的倍数。

Max Bytes For Level Multiplier:决定了LSM Tree 每层级的大小阈值的倍数关系。根据实际情况进行调整。

Target File Size:表示上一级的 SST 文件达到多大时触发 Compaction 操作,默认值是 64MB(每增加一级,阈值会自动乘以 target_file_size_multiplier)。 为了减少 Compaction 的频率,生产环境可调整为 128MB 。

Thread Num:表示后台进行 Compaction 和 Flush 操作的线程数。默认为 1,生产环境建议调大为 4 。

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

问题追因实践 性能分析 – JProfiler 找出热点方法

Flink 常见问题排查与任务调优实践

 问题追因实践 性能分析 – Java Flight Recorder 生成火焰图

Flink 常见问题排查与任务调优实践 

 Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

 Flink 常见问题排查与任务调优实践

 Flink 常见问题排查与任务调优实践

Flink 常见问题排查与任务调优实践

 Flink 常见问题排查与任务调优实践

坚持学习雀食需要强大的意志,屏蔽外界的干扰,就是学,就是卷才能成为至强者

  相关解决方案