背景
- 問(wèn)題1. 近期在開(kāi)發(fā)flink-sql期間,發(fā)現(xiàn)數(shù)據(jù)在啟動(dòng)后,任務(wù)總是進(jìn)行重試,運(yùn)行一段時(shí)間后,container heartbeat timeout,內(nèi)存溢出(GC overhead limit exceede) ,作業(yè)無(wú)法進(jìn)行正常工作
023-10-07 14:53:30,408 | INFO | [flink-akka.actor.default-dispatcher-29] | Stopping worker container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.internalStopWorker(ActiveResourceManager.java:461)
2023-10-07 14:53:30,408 | INFO | [flink-akka.actor.default-dispatcher-29] | Stopping container container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.yarn.YarnResourceManagerDriver.releaseResource(YarnResourceManagerDriver.java:298)
2023-10-07 14:53:30,409 | INFO | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1] | Processing Event EventType: STOP_CONTAINER for Container container_e03_1678102291469_2749_01_000002 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:30,824 | WARN | [flink-akka.actor.default-dispatcher-29] | Remote connection to [/10.155.0.9:42366] failed with java.io.IOException: Connection reset by peer | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN | [flink-akka.actor.default-dispatcher-29] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Disassociated] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN | [flink-metrics-6] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Disassociated] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:32,171 | INFO | [flink-akka.actor.default-dispatcher-29] | Received 1 containers. | org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$onContainersAllocated$1(YarnResourceManagerDriver.java:620)
2023-10-07 14:53:32,172 | INFO | [flink-akka.actor.default-dispatcher-29] | Received 1 containers with priority 1, 1 pending container requests. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:352)
2023-10-07 14:53:32,172 | INFO | [flink-akka.actor.default-dispatcher-29] | Removing container request Capability[<memory:4096, vCores:3>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}]Resource Profile[null]. | org.apache.flink.yarn.YarnResourceManagerDriver.removeContainerRequest(YarnResourceManagerDriver.java:405)
2023-10-07 14:53:32,172 | INFO | [flink-akka.actor.default-dispatcher-29] | Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:4096, vCores:3>. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:392)
2023-10-07 14:53:32,172 | INFO | [cluster-io-thread-4] | TaskExecutor container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) will be started on node-group-1jPmk0002.mrs-qrmc.com with TaskExecutorProcessSpec {cpuCores=3.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemorySize=1.340gb (1438814063 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=409.600mb (429496736 bytes), numSlots=6}. | org.apache.flink.yarn.YarnResourceManagerDriver.createTaskExecutorLaunchContext(YarnResourceManagerDriver.java:472)
2023-10-07 14:53:32,178 | INFO | [cluster-io-thread-4] | Creating container launch context for TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:500)
2023-10-07 14:53:32,179 | INFO | [cluster-io-thread-4] | Starting TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:520)
2023-10-07 14:53:32,182 | INFO | [flink-akka.actor.default-dispatcher-29] | Requested worker container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) with resource spec WorkerResourceSpec {cpuCores=3.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes), numSlots=6}. | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$1(ActiveResourceManager.java:419)
2023-10-07 14:53:32,182 | INFO | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #2] | Processing Event EventType: START_CONTAINER for Container container_e03_1678102291469_2749_01_000003 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:32,554 | INFO | [flink-akka.actor.default-dispatcher-29] | Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out. | org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1372)
2023-10-07 14:53:32,558 | INFO | [flink-akka.actor.default-dispatcher-29] | ConstraintEnforcer[20] (1/6) (1a65cb345c504de7b9704270217856a7) switched from RUNNING to FAILED on container_e03_1678102291469_2749_01_000002 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out.
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2023-10-07 14:53:32,567 | INFO | [flink-akka.actor.default-dispatcher-29] | Call stack:
at java.lang.Thread.getStackTrace(Thread.java:1564)
at org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1432)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1124)
at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:908)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateStateInternal(DefaultExecutionGraph.java:1318)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateState(DefaultExecutionGraph.java:1277)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:733)
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059)
at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482)
at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474)
at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505)
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378)
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373)
- 問(wèn)題2. 未出現(xiàn)container心跳超時(shí)的,作業(yè)運(yùn)行緩慢,超過(guò)一天 ,作業(yè)仍存在反壓情況
問(wèn)題分析
- 查看日志內(nèi)容發(fā)現(xiàn),出現(xiàn)內(nèi)存溢出,
2023-10-07 17:58:27,526 | INFO | [Checkpoint Timer] | Triggering checkpoint 11 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1696672707523 for job f80d756ccb03ed8d6c19d114e0ba9e63. | org.apache.flink.runtime.checkpoint.CheckpointCoordinator.createPendingCheckpoint(CheckpointCoordinator.java:832)
2023-10-07 17:59:31,393 | INFO | [flink-metrics-4] | No response from remote for outbound association. Handshake timed out after [60000 ms]. | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$3(Slf4jLogger.scala:96)
2023-10-07 17:59:31,396 | WARN | [flink-metrics-4] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852]] Caused by: [No response from remote for outbound association. Handshake timed out after [60000 ms].] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
**2023-10-07 18:00:15,005 | INFO | [flink-akka.actor.default-dispatcher-22] | LookupJoin[29] (3/3) (803c659fe1f2ef1c5886f73bab51a914) switched from RUNNING to FAILED on container_e03_1678102291469_2752_01_000006 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.lang.OutOfMemoryError: GC overhead limit exceeded**
at org.apache.flink.core.memory.MemorySegmentFactory.wrap(MemorySegmentFactory.java:48) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.data.binary.BinaryStringData.fromBytes(BinaryStringData.java:95) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.data.StringData.fromBytes(StringData.java:59) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.data.columnar.ColumnarRowData.getString(ColumnarRowData.java:123) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.data.RowData$$Lambda$794/901417218.getFieldOrNull(Unknown Source) ~[?:?]
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.data.RowData$$Lambda$788/1820263644.getFieldOrNull(Unknown Source) ~[?:?]
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.connectors.hive.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:138) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.connectors.hive.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:107) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at LookupFunction$606.flatMap(Unknown Source) ~[?:?]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.fetchElements(LookupJoinRunner.java:90) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.lambda$processElement$0(LookupJoinRunner.java:76) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner$$Lambda$1805/947327052.run(Unknown Source) ~[?:?]
at org.apache.flink.table.runtime.operators.join.lookup.AbstractLookupJoinRunner.withRetry(AbstractLookupJoinRunner.java:104) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:71) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:31) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:40) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$743/1473507981.runDefaultAction(Unknown Source) ~[?:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:216) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
at org.apache.flink.runtime.taskmanager.Task$$Lambda$1317/775704149.run(Unknown Source) ~[?:?]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2023-10-07 18:00:15,071 | INFO | [flink-akka.actor.default-dispatcher-22] | Call stack:
at java.lang.Thread.getStackTrace(Thread.java:1564)
- checkpoint失敗
2023-10-07 15:03:42,678 | WARN | [flink-akka.actor.default-dispatcher-36] | **Failed to trigger or complete checkpoint 6 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) |** org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2253 org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
2254 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1909) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2255 at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2256 at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1512) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2257 at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1113) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2258 at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1082) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2259 at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:590) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2260 at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:369) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2261 at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:345) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2262 at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:328) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2263 at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:303) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2264 at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2265 at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2266 at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2267 at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2268 at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2269 at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2270 at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2271 at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2272 at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2273 at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2274 at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2275 at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_332]
2276 at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_332]
2277 at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_332]
2278 at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2279 at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2280 at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2281 at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2282 at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2283 at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2284 at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2285 at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2286 at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2287 at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2288 at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2289 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
2290 at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
2291 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2292 at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2293 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2294 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2295 at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2296 at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2297 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2298 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2299 at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2300 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2301 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2302 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2303 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2304 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2305 at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2306 at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2307 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2308 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2309 at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2310 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2311 at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2312 at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2313 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
2314 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
2315 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
2316 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2317 2023-10-07 15:03:42,681 | WARN | [flink-akka.actor.default-dispatcher-19] | Remote connection to [null] failed with java.net.ConnectException: Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331 | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2318 2023-10-07 15:03:42,681 | WARN | [flink-akka.actor.default-dispatcher-19] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331]] Caused by: [java.net.ConnectException : Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2319 2023-10-07 15:03:42,681 | INFO | [SourceCoordinator-Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]] | Removing registered reader after failure for subtask 0 of source Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]. | org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$subtaskFailed$3(SourceCoordinator.java:267 )
2320 2023-10-07 15:03:42,681 | INFO | [flink-akka.actor.default-dispatcher-36] | Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]
- akka 超時(shí)
8 2023-10-07 15:02:01,864 | INFO | [flink-scheduler-1] | Triggering Checkpoint 5 for job 0595b0727d9241894b541fd6e82af814 failed due to **java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1 jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout. |** org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:681)
2079 2023-10-07 15:02:01,865 | WARN | [Checkpoint Timer] | Failed to trigger or complete checkpoint 5 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) | org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2080 org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
2081 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:691) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2082 at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) ~[?:1.8.0_332]
2083 at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) ~[?:1.8.0_332]
2084 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2085 at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2086 at org.apache.flink.util.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:914) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2087 at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2088 at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2089 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2090 at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2091 at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252) ~[?:?]
2092 at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2093 at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2094 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2095 at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2096 at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2097 at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) ~[?:?]
2098 at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[?:?]
2099 at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) ~[?:?]
2100 at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2101 at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2102 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2103 at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2104 at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) ~[?:?]
2105 at akka.dispatch.OnComplete.internal(Future.scala:299) ~[?:?]
2106 at akka.dispatch.OnComplete.internal(Future.scala:297) ~[?:?]
2107 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) ~[?:?]
2108 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) ~[?:?]
2109 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2110 at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) ~[?:?]
2111 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2112 at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2113 at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2114 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2115 at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:729) ~[?:?]
2116 at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:479) ~[?:?]
2117 at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.unbatchedExecute(SameThreadExecutionContext.scala:21) ~[?:?]
2118 at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:133) ~[?:?]
2119 at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:124) ~[?:?]
2120 at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.execute(SameThreadExecutionContext.scala:20) ~[?:?]
2121 at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:365) ~[?:?]
2122 at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:314) ~[?:?]
2123 at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:318) ~[?:?]
2124 at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270) ~[?:?]
2125 at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
2126 Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
2127 at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.triggerCheckpoint(RpcTaskManagerGateway.java:128) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2128 at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpointHelper(Execution.java:854) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2129 at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpoint(Execution.java:830) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2130 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerTasks(CheckpointCoordinator.java:745) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2131 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpointRequest(CheckpointCoordinator.java:678) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2132 at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:645) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2133 at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_332]
2134 at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_332]
2135 at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_332]
2136 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332]
2137 at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332]
2138 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_332]
2139 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_332]
2140 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332]
2141 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332]
作業(yè)優(yōu)化調(diào)整
初步優(yōu)化
- 根據(jù)上述3個(gè)原因,并且不存在數(shù)據(jù)傾斜的情況,適當(dāng)增加內(nèi)存,增加checkpoint超時(shí)/間隔時(shí)間,akka請(qǐng)求超時(shí)時(shí)間
-- 增加內(nèi)存
set 'taskmanager.heap.size' = '5092MB';
-- 增加checkpoint間隔/超時(shí)時(shí)間
set 'execution.checkpointing.timeout'='60min';
set 'execution.checkpointing.interval' = '10 s';
-- 增加異步通信時(shí)間
set 'akka.ask.timeout' = '30 s';
- 優(yōu)化發(fā)現(xiàn)仍然未解決上述問(wèn)題,再次查看flink-sql作業(yè),發(fā)現(xiàn)仍然看到任務(wù)不斷attempt,進(jìn)一步分析發(fā)現(xiàn)flink-sql 數(shù)據(jù)流和外部維表進(jìn)行join時(shí)耗時(shí)較久,將維表去除進(jìn)行測(cè)試,發(fā)現(xiàn)反壓很快消失確定問(wèn)題方向后,進(jìn)行針對(duì)性
二次優(yōu)化
- 根據(jù)發(fā)現(xiàn)的問(wèn)題后,一是內(nèi)存溢出,盡量減少維表數(shù)據(jù)量,根據(jù)業(yè)務(wù)需求、數(shù)據(jù)建模需要,只選擇必須字段加工新的維表,減少join時(shí)的緩存數(shù)據(jù)量
如 維表 dim_a 有 col1,col2,col3,col4,col5,col6,確認(rèn)只需要,col2,,col6,,則可以加工出 col2,col6的維表
- 對(duì)指標(biāo)計(jì)算,為減少數(shù)據(jù)計(jì)算,利用兩階聚合(先分桶和 group key聚合,再根據(jù)group key進(jìn)行聚合)優(yōu)勢(shì),minibatch(批次計(jì)算)優(yōu)勢(shì),在犧牲較低延遲的基礎(chǔ),批次計(jì)算指標(biāo)后,計(jì)算效率得到大幅提升,優(yōu)化前 運(yùn)行4d 40min 仍存在反壓,優(yōu)化后35min后,反壓完全消失
set 'table.exec.mini-batch.enabled'='true';
set 'table.exec.mini-batch.allow-latency'='10 s';
set 'table.exec.mini-batch.size'='5000';
set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
優(yōu)化對(duì)比
- 優(yōu)化前
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-740191.html
- 優(yōu)化后
- 從下圖發(fā)現(xiàn),經(jīng)過(guò)兩階聚合后,左邊經(jīng)過(guò)mini-batch,兩階聚合優(yōu)化后,處理數(shù)據(jù)量明顯減少
總結(jié)
- 定位問(wèn)題時(shí),需要根據(jù)具體日志信息進(jìn)行分析,結(jié)合flink運(yùn)行原理和監(jiān)控頁(yè)面一步步定位出現(xiàn)問(wèn)題蒜子
- 參考文檔 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/tuning/
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-740191.html
到了這里,關(guān)于Flink-SQL join 優(yōu)化 -- MiniBatch + local-global的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!