国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink-SQL join 優(yōu)化 -- MiniBatch + local-global

這篇具有很好參考價(jià)值的文章主要介紹了Flink-SQL join 優(yōu)化 -- MiniBatch + local-global。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

背景

  • 問(wèn)題1. 近期在開(kāi)發(fā)flink-sql期間,發(fā)現(xiàn)數(shù)據(jù)在啟動(dòng)后,任務(wù)總是進(jìn)行重試,運(yùn)行一段時(shí)間后,container heartbeat timeout,內(nèi)存溢出(GC overhead limit exceede) ,作業(yè)無(wú)法進(jìn)行正常工作
023-10-07 14:53:30,408 | INFO  | [flink-akka.actor.default-dispatcher-29] | Stopping worker container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.internalStopWorker(ActiveResourceManager.java:461)
2023-10-07 14:53:30,408 | INFO  | [flink-akka.actor.default-dispatcher-29] | Stopping container container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041). | org.apache.flink.yarn.YarnResourceManagerDriver.releaseResource(YarnResourceManagerDriver.java:298)
2023-10-07 14:53:30,409 | INFO  | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #1] | Processing Event EventType: STOP_CONTAINER for Container container_e03_1678102291469_2749_01_000002 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:30,824 | WARN  | [flink-akka.actor.default-dispatcher-29] | Remote connection to [/10.155.0.9:42366] failed with java.io.IOException: Connection reset by peer | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN  | [flink-akka.actor.default-dispatcher-29] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Disassociated]  | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:30,825 | WARN  | [flink-metrics-6] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Disassociated]  | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2023-10-07 14:53:32,171 | INFO  | [flink-akka.actor.default-dispatcher-29] | Received 1 containers. | org.apache.flink.yarn.YarnResourceManagerDriver$YarnContainerEventHandler.lambda$onContainersAllocated$1(YarnResourceManagerDriver.java:620)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Received 1 containers with priority 1, 1 pending container requests. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:352)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Removing container request Capability[<memory:4096, vCores:3>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}]Resource Profile[null]. | org.apache.flink.yarn.YarnResourceManagerDriver.removeContainerRequest(YarnResourceManagerDriver.java:405)
2023-10-07 14:53:32,172 | INFO  | [flink-akka.actor.default-dispatcher-29] | Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:4096, vCores:3>. | org.apache.flink.yarn.YarnResourceManagerDriver.onContainersOfPriorityAllocated(YarnResourceManagerDriver.java:392)
2023-10-07 14:53:32,172 | INFO  | [cluster-io-thread-4] | TaskExecutor container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) will be started on node-group-1jPmk0002.mrs-qrmc.com with TaskExecutorProcessSpec {cpuCores=3.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemorySize=1.340gb (1438814063 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=409.600mb (429496736 bytes), numSlots=6}. | org.apache.flink.yarn.YarnResourceManagerDriver.createTaskExecutorLaunchContext(YarnResourceManagerDriver.java:472)
2023-10-07 14:53:32,178 | INFO  | [cluster-io-thread-4] | Creating container launch context for TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:500)
2023-10-07 14:53:32,179 | INFO  | [cluster-io-thread-4] | Starting TaskManagers | org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:520)
2023-10-07 14:53:32,182 | INFO  | [flink-akka.actor.default-dispatcher-29] | Requested worker container_e03_1678102291469_2749_01_000003(node-group-1jPmk0002.mrs-qrmc.com:8041) with resource spec WorkerResourceSpec {cpuCores=3.0, taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 bytes), numSlots=6}. | org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.lambda$requestNewWorker$1(ActiveResourceManager.java:419)
2023-10-07 14:53:32,182 | INFO  | [org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl #2] | Processing Event EventType: START_CONTAINER for Container container_e03_1678102291469_2749_01_000003 | org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl$ContainerEventProcessor.run(NMClientAsyncImpl.java:955)
2023-10-07 14:53:32,554 | INFO  | [flink-akka.actor.default-dispatcher-29] | Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out. | org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1372)
2023-10-07 14:53:32,558 | INFO  | [flink-akka.actor.default-dispatcher-29] | ConstraintEnforcer[20] (1/6) (1a65cb345c504de7b9704270217856a7) switched from RUNNING to FAILED on container_e03_1678102291469_2749_01_000002 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e03_1678102291469_2749_01_000002(node-group-1jPmk0002.mrs-qrmc.com:8041) timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2023-10-07 14:53:32,567 | INFO  | [flink-akka.actor.default-dispatcher-29] | Call stack:
    at java.lang.Thread.getStackTrace(Thread.java:1564)
    at org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1432)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1124)
    at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:908)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateStateInternal(DefaultExecutionGraph.java:1318)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateState(DefaultExecutionGraph.java:1277)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:733)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373)

  • 問(wèn)題2. 未出現(xiàn)container心跳超時(shí)的,作業(yè)運(yùn)行緩慢,超過(guò)一天 ,作業(yè)仍存在反壓情況
    Flink-SQL join 優(yōu)化 -- MiniBatch + local-global,flink,sql,大數(shù)據(jù)

問(wèn)題分析

  • 查看日志內(nèi)容發(fā)現(xiàn),出現(xiàn)內(nèi)存溢出,
2023-10-07 17:58:27,526 | INFO  | [Checkpoint Timer] | Triggering checkpoint 11 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1696672707523 for job f80d756ccb03ed8d6c19d114e0ba9e63. | org.apache.flink.runtime.checkpoint.CheckpointCoordinator.createPendingCheckpoint(CheckpointCoordinator.java:832)
2023-10-07 17:59:31,393 | INFO  | [flink-metrics-4] | No response from remote for outbound association. Handshake timed out after [60000 ms]. | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$3(Slf4jLogger.scala:96)
2023-10-07 17:59:31,396 | WARN  | [flink-metrics-4] | Association with remote system [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@node-group-1jpmk0002.mrs-qrmc.com:28852]] Caused by: [No response from remote for outbound association. Handshake timed out after [60000 ms].] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
**2023-10-07 18:00:15,005 | INFO  | [flink-akka.actor.default-dispatcher-22] | LookupJoin[29] (3/3) (803c659fe1f2ef1c5886f73bab51a914) switched from RUNNING to FAILED on container_e03_1678102291469_2752_01_000006 @ node-group-1jPmk0002.mrs-qrmc.com (dataPort=32396). | org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1424)
java.lang.OutOfMemoryError: GC overhead limit exceeded**
	at org.apache.flink.core.memory.MemorySegmentFactory.wrap(MemorySegmentFactory.java:48) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.binary.BinaryStringData.fromBytes(BinaryStringData.java:95) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.StringData.fromBytes(StringData.java:59) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.columnar.ColumnarRowData.getString(ColumnarRowData.java:123) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$1(RowData.java:221) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData$$Lambda$794/901417218.getFieldOrNull(Unknown Source) ~[?:?]
	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296) ~[flink-table-api-java-uber-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.data.RowData$$Lambda$788/1820263644.getFieldOrNull(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connectors.hive.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:138) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connectors.hive.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:107) ~[flink-connector-hive_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at LookupFunction$606.flatMap(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.fetchElements(LookupJoinRunner.java:90) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.lambda$processElement$0(LookupJoinRunner.java:76) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner$$Lambda$1805/947327052.run(Unknown Source) ~[?:?]
	at org.apache.flink.table.runtime.operators.join.lookup.AbstractLookupJoinRunner.withRetry(AbstractLookupJoinRunner.java:104) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:71) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:31) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:40) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$743/1473507981.runDefaultAction(Unknown Source) ~[?:?]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:216) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task$$Lambda$1317/775704149.run(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2023-10-07 18:00:15,071 | INFO  | [flink-akka.actor.default-dispatcher-22] | Call stack:
    at java.lang.Thread.getStackTrace(Thread.java:1564)
  • checkpoint失敗
2023-10-07 15:03:42,678 | WARN  | [flink-akka.actor.default-dispatcher-36] | **Failed to trigger or complete checkpoint 6 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) |** org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2253 org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
2254         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1909) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2255         at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2256         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1512) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2257         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1113) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2258         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1082) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2259         at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:590) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2260         at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:369) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2261         at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:345) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2262         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:328) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2263         at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:303) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2264         at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2265         at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2266         at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2267         at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2268         at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1543) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2269         at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1119) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2270         at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1059) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2271         at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:760) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2272         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2273         at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2274         at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2275         at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_332]
2276         at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_332]
2277         at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_332]
2278         at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2279         at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2280         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:482) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2281         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:474) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2282         at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:445) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2283         at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2284         at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2285         at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:505) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2286         at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1378) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2287         at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1373) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2288         at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2289         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
2290         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_332]
2291         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2292         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2293         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2294         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2295         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2296         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2297         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2298         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2299         at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2300         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2301         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2302         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2303         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2304         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2305         at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2306         at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2307         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2308         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2309         at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2310         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2311         at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2312         at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_427efea8-e9d4-4177-abbe-967ec8f15a58.jar:1.15.0-h0.cbu.mrs.320.r33]
2313         at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_332]
2314         at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_332]
2315         at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_332]
2316         at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_332]
2317 2023-10-07 15:03:42,681 | WARN  | [flink-akka.actor.default-dispatcher-19] | Remote connection to [null] failed with java.net.ConnectException: Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331 | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2318 2023-10-07 15:03:42,681 | WARN  | [flink-akka.actor.default-dispatcher-19] | Association with remote system [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331]] Caused by: [java.net.ConnectException     : Connection refused: node-group-1jPmk0002.mrs-qrmc.com/10.155.0.9:32331] | akka.event.slf4j.Slf4jLogger$$anonfun$receive$1.$anonfun$applyOrElse$2(Slf4jLogger.scala:90)
2319 2023-10-07 15:03:42,681 | INFO  | [SourceCoordinator-Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]] | Removing registered reader after failure for subtask 0 of source Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]. | org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$subtaskFailed$3(SourceCoordinator.java:267     )
2320 2023-10-07 15:03:42,681 | INFO  | [flink-akka.actor.default-dispatcher-36] | Source: dws_hljy_logistics_ykt_rytb_m_rec_consume[11]
  • akka 超時(shí)
8 2023-10-07 15:02:01,864 | INFO  | [flink-scheduler-1] | Triggering Checkpoint 5 for job 0595b0727d9241894b541fd6e82af814 failed due to **java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1     jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout. |** org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:681)
2079 2023-10-07 15:02:01,865 | WARN  | [Checkpoint Timer] | Failed to trigger or complete checkpoint 5 for job 0595b0727d9241894b541fd6e82af814. (0 consecutive failed attempts so far) | org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:114)
2080 org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
2081         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpointRequest$10(CheckpointCoordinator.java:691) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2082         at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) ~[?:1.8.0_332]
2083         at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) ~[?:1.8.0_332]
2084         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2085         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2086         at org.apache.flink.util.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:914) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2087         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2088         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2089         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2090         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2091         at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252) ~[?:?]
2092         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2093         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2094         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2095         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2096         at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2097         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) ~[?:?]
2098         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[?:?]
2099         at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) ~[?:?]
2100         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_332]
2101         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_332]
2102         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_332]
2103         at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_332]
2104         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45) ~[?:?]
2105         at akka.dispatch.OnComplete.internal(Future.scala:299) ~[?:?]
2106         at akka.dispatch.OnComplete.internal(Future.scala:297) ~[?:?]
2107         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) ~[?:?]
2108         at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) ~[?:?]
2109         at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2110         at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) ~[?:?]
2111         at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2112         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2113         at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2114         at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-scala_2.12-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2115         at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:729) ~[?:?]
2116         at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:479) ~[?:?]
2117         at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.unbatchedExecute(SameThreadExecutionContext.scala:21) ~[?:?]
2118         at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:133) ~[?:?]
2119         at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:124) ~[?:?]
2120         at akka.dispatch.internal.SameThreadExecutionContext$$anon$1.execute(SameThreadExecutionContext.scala:20) ~[?:?]
2121         at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:365) ~[?:?]
2122         at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:314) ~[?:?]
2123         at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:318) ~[?:?]
2124         at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270) ~[?:?]
2125         at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
2126 Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@node-group-1jpmk0002.mrs-qrmc.com:32331/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message      silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
2127         at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.triggerCheckpoint(RpcTaskManagerGateway.java:128) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2128         at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpointHelper(Execution.java:854) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2129         at org.apache.flink.runtime.executiongraph.Execution.triggerCheckpoint(Execution.java:830) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2130         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerTasks(CheckpointCoordinator.java:745) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2131         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerCheckpointRequest(CheckpointCoordinator.java:678) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2132         at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:645) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
2133         at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_332]
2134         at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_332]
2135         at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_332]
2136         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332]
2137         at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332]
2138         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_332]
2139         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_332]
2140         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332]
2141         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332]

作業(yè)優(yōu)化調(diào)整

初步優(yōu)化

  • 根據(jù)上述3個(gè)原因,并且不存在數(shù)據(jù)傾斜的情況,適當(dāng)增加內(nèi)存,增加checkpoint超時(shí)/間隔時(shí)間,akka請(qǐng)求超時(shí)時(shí)間

-- 增加內(nèi)存
set 'taskmanager.heap.size' = '5092MB';

-- 增加checkpoint間隔/超時(shí)時(shí)間
set 'execution.checkpointing.timeout'='60min';
set 'execution.checkpointing.interval' = '10 s';

-- 增加異步通信時(shí)間
set 'akka.ask.timeout' = '30 s';
  • 優(yōu)化發(fā)現(xiàn)仍然未解決上述問(wèn)題,再次查看flink-sql作業(yè),發(fā)現(xiàn)仍然看到任務(wù)不斷attempt,進(jìn)一步分析發(fā)現(xiàn)flink-sql 數(shù)據(jù)流和外部維表進(jìn)行join時(shí)耗時(shí)較久,將維表去除進(jìn)行測(cè)試,發(fā)現(xiàn)反壓很快消失確定問(wèn)題方向后,進(jìn)行針對(duì)性

二次優(yōu)化

  • 根據(jù)發(fā)現(xiàn)的問(wèn)題后,一是內(nèi)存溢出,盡量減少維表數(shù)據(jù)量,根據(jù)業(yè)務(wù)需求、數(shù)據(jù)建模需要,只選擇必須字段加工新的維表,減少join時(shí)的緩存數(shù)據(jù)量
如 維表 dim_a 有  col1,col2,col3,col4,col5,col6,確認(rèn)只需要,col2,,col6,,則可以加工出 col2,col6的維表
  • 對(duì)指標(biāo)計(jì)算,為減少數(shù)據(jù)計(jì)算,利用兩階聚合(先分桶和 group key聚合,再根據(jù)group key進(jìn)行聚合)優(yōu)勢(shì),minibatch(批次計(jì)算)優(yōu)勢(shì),在犧牲較低延遲的基礎(chǔ),批次計(jì)算指標(biāo)后,計(jì)算效率得到大幅提升,優(yōu)化前 運(yùn)行4d 40min 仍存在反壓,優(yōu)化后35min后,反壓完全消失
  set 'table.exec.mini-batch.enabled'='true';
 set 'table.exec.mini-batch.allow-latency'='10 s';
 set 'table.exec.mini-batch.size'='5000';
 set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';

優(yōu)化對(duì)比

  • 優(yōu)化前

Flink-SQL join 優(yōu)化 -- MiniBatch + local-global,flink,sql,大數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-740191.html

  • 優(yōu)化后
    Flink-SQL join 優(yōu)化 -- MiniBatch + local-global,flink,sql,大數(shù)據(jù)
  • 從下圖發(fā)現(xiàn),經(jīng)過(guò)兩階聚合后,左邊經(jīng)過(guò)mini-batch,兩階聚合優(yōu)化后,處理數(shù)據(jù)量明顯減少
    Flink-SQL join 優(yōu)化 -- MiniBatch + local-global,flink,sql,大數(shù)據(jù)

總結(jié)

  • 定位問(wèn)題時(shí),需要根據(jù)具體日志信息進(jìn)行分析,結(jié)合flink運(yùn)行原理和監(jiān)控頁(yè)面一步步定位出現(xiàn)問(wèn)題蒜子
  • 參考文檔 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/tuning/
    Flink-SQL join 優(yōu)化 -- MiniBatch + local-global,flink,sql,大數(shù)據(jù)
    Flink-SQL join 優(yōu)化 -- MiniBatch + local-global,flink,sql,大數(shù)據(jù)

到了這里,關(guān)于Flink-SQL join 優(yōu)化 -- MiniBatch + local-global的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink-sql實(shí)戰(zhàn)】flink 主鍵聲明與upsert功能實(shí)戰(zhàn)

    主鍵用作 Flink 優(yōu)化的一種提示信息。主鍵限制表明一張表或視圖的 某個(gè)(些)列 是唯一的 并且不包含 Null 值 。 主鍵聲明的列都是非 nullable 的。因此主鍵可以被用作表行級(jí)別的唯一標(biāo)識(shí)。 主鍵可以和列的定義一起聲明,也可以獨(dú)立聲明為表的限制屬性,不管是哪種方式,

    2024年02月03日
    瀏覽(26)
  • flink-sql對(duì)kafka數(shù)據(jù)進(jìn)行清洗過(guò)濾

    今天這篇blog主要記錄使用flink-sql對(duì)kafka中的數(shù)據(jù)進(jìn)行過(guò)濾。 以前對(duì)kafka數(shù)據(jù)進(jìn)行實(shí)時(shí)處理時(shí)都是使用java來(lái)進(jìn)行flink開(kāi)發(fā),需要?jiǎng)?chuàng)建一個(gè)工程,并且打成jar包再提交,流程固定但對(duì)于簡(jiǎn)單任務(wù)來(lái)說(shuō)還是比較繁瑣的。 今天我們要對(duì)logstash采集到kafka中的數(shù)據(jù)進(jìn)行過(guò)濾篩選,將篩選

    2024年02月16日
    瀏覽(30)
  • 11 flink-sql 中基于 mysql-cdc 連接 mysql-pxc 集群無(wú)法獲取增量數(shù)據(jù)問(wèn)題

    11 flink-sql 中基于 mysql-cdc 連接 mysql-pxc 集群無(wú)法獲取增量數(shù)據(jù)問(wèn)題

    問(wèn)題是來(lái)自于 群友, 2024.03.29, 也是花了一些時(shí)間 來(lái)排查這個(gè)問(wèn)題? 大致的問(wèn)題是用 mysql-cdc 連接了一個(gè) mysql-pxc 集群, 然后創(chuàng)建了一個(gè) test_user 表? 使用 \\\"select * from test_user\\\" 獲取數(shù)據(jù)表的數(shù)據(jù), 可以拿到 查詢時(shí)的快照, 但是 無(wú)法獲取到后續(xù)對(duì)于 test_user 表的增量操作的數(shù)據(jù), 比如

    2024年04月15日
    瀏覽(43)
  • Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 詳解

    Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 詳解

    Flink ?持?常多的數(shù)據(jù) Join ?式,主要包括以下三種: 動(dòng)態(tài)表(流)與動(dòng)態(tài)表(流)的 Join 動(dòng)態(tài)表(流)與外部維表(?如 Redis)的 Join 動(dòng)態(tài)表字段的列轉(zhuǎn)?(?種特殊的 Join) 細(xì)分 Flink SQL ?持的 Join: Regular Join:流與流的 Join,包括 Inner Equal Join、Outer Equal Join Interval Joi

    2024年02月04日
    瀏覽(23)
  • Flink SQL之Interval Joins

    Flink SQL之Interval Joins

    區(qū)間是雙流join的優(yōu)化,基于處理時(shí)間或事件時(shí)間,在一定時(shí)間區(qū)間內(nèi)數(shù)據(jù),相同的key進(jìn)行join(支持 BatchStreaming)。Interval Join 可以讓一條流去 Join 另一條流中前后一段時(shí)間內(nèi)的數(shù)據(jù)。 對(duì)于stream查詢,時(shí)間區(qū)間join只支持有時(shí)間屬性的 append-only表。由于時(shí)間屬性是準(zhǔn)單調(diào)遞增的

    2024年02月09日
    瀏覽(19)
  • 使用 Alluxio 優(yōu)化 EMR 上 Flink Join

    使用 Alluxio 優(yōu)化 EMR 上 Flink Join

    業(yè)務(wù)背景痛點(diǎn) 流式處理的業(yè)務(wù)場(chǎng)景,經(jīng)常會(huì)遇到實(shí)時(shí)消息數(shù)據(jù)需要與歷史存量數(shù)據(jù)關(guān)聯(lián)查詢或者聚合,比如電商常見(jiàn)的訂單場(chǎng)景,訂單表做為實(shí)時(shí)事實(shí)表,是典型的流式消息數(shù)據(jù),通常會(huì)在 kafka 中,而客戶信息,商品 SKU 表是維度表,通常存在業(yè)務(wù)數(shù)據(jù)庫(kù)或者數(shù)倉(cāng)中,是典型

    2023年04月09日
    瀏覽(17)
  • 【大數(shù)據(jù)】Flink SQL 語(yǔ)法篇(六):Temporal Join

    《 Flink SQL 語(yǔ)法篇 》系列,共包含以下 10 篇文章: Flink SQL 語(yǔ)法篇(一):CREATE Flink SQL 語(yǔ)法篇(二):WITH、SELECT WHERE、SELECT DISTINCT Flink SQL 語(yǔ)法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE) Flink SQL 語(yǔ)法篇(四):Group 聚合、Over 聚合 Flink SQL 語(yǔ)法篇(五):Regular Join、

    2024年03月15日
    瀏覽(67)
  • 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細(xì)示例(2-2)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月08日
    瀏覽(25)
  • 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細(xì)示例(2-1)

    27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介紹及詳細(xì)示例(2-1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月07日
    瀏覽(19)
  • flink1.15 維表join guava cache和mysql方面優(yōu)化

    flink1.15 維表join guava cache和mysql方面優(yōu)化

    優(yōu)化前? mysql響應(yīng)慢,導(dǎo)致算子中數(shù)據(jù)輸出追不上輸入,導(dǎo)致顯示cpu busy:100% 優(yōu)化后效果兩個(gè)圖對(duì)應(yīng)兩個(gè)時(shí)刻: - - 圖中g(shù)uava cache命中率是通過(guò)guava自帶統(tǒng)計(jì),打印出來(lái)的. 1 guava緩存數(shù)據(jù)量上限?= 類中配置的guava緩存數(shù)據(jù)上線 * task個(gè)數(shù)(即flink并行度) 緩存越久 命中率越高 數(shù)據(jù)越陳舊

    2024年01月17日
    瀏覽(24)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包