问题描述
Flink接入kafka数据写入hdfs集群,正常运行一段时间20min到1h作业后报错,failed挂掉。
报错信息
检查点问题:
Flink job failed with “Checkpoint Coordinator is suspending.
2020-12-26 20:58:54
org.apache.flink.runtime.JobException:Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=10000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunctionclass.applyOrElse(PartialFunction.scala:123)atakka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)atscala.PartialFunctionclass.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunctionclass.applyOrElse(PartialFunction.scala:123)atakka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)atscala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171)atscala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171)atscala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actorclass.aroundReceive(Actor.scala:517)atakka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)atakka.actor.ActorCell.receiveMessage(ActorCell.scala:592)atakka.actor.ActorCell.invoke(ActorCell.scala:561)atakka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)atakka.dispatch.Mailbox.run(Mailbox.scala:225)atakka.dispatch.Mailbox.exec(Mailbox.scala:235)atakka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)atakka.dispatch.forkjoin.ForkJoinPoolclass.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPoolclass.aroundReceive(Actor.scala:517)atakka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)atakka.actor.ActorCell.receiveMessage(ActorCell.scala:592)atakka.actor.ActorCell.invoke(ActorCell.scala:561)atakka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)atakka.dispatch.Mailbox.run(Mailbox.scala:225)atakka.dispatch.Mailbox.exec(Mailbox.scala:235)atakka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)atakka.dispatch.forkjoin.ForkJoinPoolWorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception:Could not perform checkpoint 286 for operator Source: Custom Source -> Map -> Filter -> Sink: Unnamed (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:822)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$4(StreamTask.java:789)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by:org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 286 for operator Source: Custom Source -> Map -> Filter -> Sink: Unnamed (1/1). Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint5(StreamTask.java:892)atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor5(StreamTask.java:892) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor5(StreamTask.java:892)atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutorSynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:882)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:813)
… 11 more
Caused by: java.io.IOException: Failing write. Tried pipeline recovery 5 times without success.
at org.apache.hadoop.hdfs.DataStreamer.processDatanodeOrExternalError(DataStreamer.java:1250)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:667)
原因
在自定义翻桶器Bucketer和Sink
的时候设置的策略不当导致:
背景
? 每接入一条kafka数据,根据数据字段里面的时间来生成hdfs里面的文件目录和文件后缀——INACTIVE_BUCKET_THRESHOLD
长时间未写入新数据,导致大量的文件未关闭。
定位和测试
在黑盒测试下,BATCH_ROLLOVER_INTERVAL
(定时生成新文件策略,跟文件大小对应),为一分钟的时候作业无问题(不挂掉,跑了3d)。固定时间生成新文件策略BATCH_ROLLOVER_INTERVAL
为60s。修改未响应等待时长INACTIVE_BUCKET_THRESHOLD
分别为:
测试用例:
1:间隔60s,30min
2:间隔60s,15min
3:间隔60s,5min
报错的配置:
30*60*1000L
与20*60*1000
最少20min关闭成功的配置:
30*60*1000L
与60*1000
最少1min关闭
源码默认的配置:
解决方案:修改BATCH_ROLLOVER_INTERVAL
为默认60s的未响应关闭时间。