本博客總結(jié)為B站尚硅谷大數(shù)據(jù)Flink2.0調(diào)優(yōu),F(xiàn)link性能優(yōu)化視頻中常見故障排除的的筆記總結(jié)。
1. 非法配置異常
如果看到從 TaskExecutorProcessUtils 或 JobManagerProcessUtils 拋出的 IllegalConfigurationException,通常表明存在無效的配置值(例如負(fù)內(nèi)存大小、大于 1 的分?jǐn)?shù)等)或配置沖突。請重新配置內(nèi)存參數(shù)。
2. Java 堆空間異常
如果報 OutOfMemoryError: Java heap space 異常,通常表示 JVM Heap 太小。可以通過增加總內(nèi)存來增加 JVM 堆大小。也可以直接為 TaskManager 增加任務(wù)堆內(nèi)存或為 JobManager 增加 JVM 堆內(nèi)存。
還可以為 TaskManagers 增加框架堆內(nèi)存,但只有在確定 Flink 框架本身需要更多內(nèi)存時才應(yīng)該更改此選項。
3. 直接緩沖存儲器異常
如果報 OutOfMemoryError: Direct buffer memory 異常,通常表示 JVM 直接內(nèi)存限制太小或存在直接內(nèi)存泄漏。檢查用戶代碼或其他外部依賴項是否使用了 JVM 直接內(nèi)存,以及它是否被正確考慮??梢試L試通過調(diào)整直接堆外內(nèi)存來增加其限制??梢詤⒖既绾螢?TaskManagers、 JobManagers 和 Flink 設(shè)置的 JVM 參數(shù)配置堆外內(nèi)存。
4. 元空間異常
如果報 OutOfMemoryError: Metaspace 異常,通常表示 JVM 元空間限制配置得太小??梢試L試加大 JVM 元空間 TaskManagers 或 JobManagers 選項。
5. 網(wǎng)絡(luò)緩沖區(qū)數(shù)量不足
如果報 IOException: Insufficient number of network buffers 異常,這僅與 TaskManager 相關(guān)。通常表示配置的網(wǎng)絡(luò)內(nèi)存大小不夠大??梢試L試增加網(wǎng)絡(luò)內(nèi)存。
6. 超出容器內(nèi)存異常
如果 Flink 容器嘗試分配超出其請求大?。╕arn 或 Kubernetes)的內(nèi)存,這通常表明 Flink 沒有預(yù)留足夠的本機內(nèi)存。當(dāng)容器被部署環(huán)境殺死時,可以通過使用外部監(jiān)控系統(tǒng)或從錯誤消息中觀察到這一點。
如果在 JobManager 進程中遇到這個問題,還可以通過設(shè)置排除可能的 JVM DirectMemory 泄漏的選項來開啟 JVM Direct Memory 的限制:
jobmanager.memory.enable-jvm-direct-memory-limit: true
如果想手動多分一部分內(nèi)存給 RocksDB 來防止超用,預(yù)防在云原生的環(huán)境因 OOM 被 K8S kill,可將 JVM OverHead 內(nèi)存調(diào)大。
之所以不調(diào)大 Task Off-Heap,是由于目前 Task Off-Heap 是和 Direct Memeory 混在一起的,即使調(diào)大整體,也并不一定會分給 RocksDB 來做 Buffer,所以我們推薦通過調(diào)整 JVM OverHead 來解決內(nèi)存超用的問題。
7. Checkpoint 失敗
Checkpoint 失敗大致分為兩種情況:Checkpoint Decline 和 Checkpoint Expire。
7.1. Checkpoint Decline
我們能從 jobmanager.log 中看到類似下面的日志:
ecline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.
我們可以在 jobmanager.log 中查找 execution id,找到被調(diào)度到哪個 taskmanager 上,類似如下所示:
2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph (100/289) (87b751b1fd90e32af55f02bb2f9a9892) switched DEPLOYING. - XXXXXXXXXXX from SCHEDULED to
2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph - DeployingXXXXXXXXXXX (100/289) (attempt #0) to slotcontainer_e24_1566836790522_8088_04_013155_1 on hostnameABCDE
從上面的日志我們知道該 execution 被調(diào)度到 hostnameABCDE 的 container_e24_1566836790522_8088_04_013155_1 slot 上,接下來我們就可以到 container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失敗的具體原因了。
另外對于 Checkpoint Decline 的情況,有一種情況在這里單獨抽取出來進行介紹: Checkpoint Cancel。
當(dāng)前 Flink 中如果較小的 Checkpoint 還沒有對齊的情況下,收到了更大的 Checkpoint,則會把較小的 Checkpoint 給取消掉。我們可以看到類似下面的日志:
\$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.
這個日志表示,當(dāng)前 Checkpoint 19 還在對齊階段,我們收到了 Checkpoint 20 的 barrier。然后會逐級通知到下游的 task checkpoint 19 被取消了,同時也會通知 JM 當(dāng)前 Checkpoint 被 decline 掉了。
在下游 task 收到被 cancelBarrier 的時候,會打印類似如下的日志:
DEBUG $taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.
或者
DEBUG $taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.
或者
WARN $taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.
上面三種日志都表示當(dāng)前 task 接收到上游發(fā)送過來的 barrierCancel 消息,從而取消了對應(yīng)的 Checkpoint。
7.2. Checkpoint Expire
如果 Checkpoint 做的非常慢,超過了 timeout 還沒有完成,則整個 Checkpoint 也會失敗。當(dāng)一個 Checkpoint 由于超時而失敗時,會在 jobmanager.log 中看到如下的日志:
Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178 expired before completing.
表示 Chekpoint 1 由于超時而失敗,這個時候可以可以看這個日志后面是否有類似下面的日志:
Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.
可以按照 7.1 中的方法找到對應(yīng)的 taskmanager.log 查看具體信息。
我們按照下面的日志把 TM 端的 snapshot 分為三個階段:開始做 snapshot 前,同步階段,異步階段,需要開啟 DEBUG 才能看到:
DEBUG Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
上面的日志表示 TM 端 barrier 對齊后,準(zhǔn)備開始做 Checkpoint。
2019-08-06 13:43:02,613 DEBU Gorg.apache.flink.runtime.state.AbstractSnapshotStrategy - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf,checkpointDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, sharedStateDirectory=xxxxxxxx,
metadataFilePath=xxxxxx, reference=(default),fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx_source - > Filter (27/70),5,Flink Task Threads] took 0 ms.
上面的日志表示當(dāng)前這個 backend 的同步階段完成,共使用了 0 ms。
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe,heckpointDirectory=xxxxxx,
taskOwnedStateDirectory=xxxxx, sharedStateDirectory=xxxxx, metadataFilePath=xxxxxx, reference=(default),fileStateSizeThreshold=1024}, asynchronous part) in thread Thread [pool-48-thread-14,5,Flink Task Threads] took 369 ms
上面的日志表示異步階段完成,異步階段使用了 369 ms
在現(xiàn)有的日志情況下,我們通過上面三個日志,定位 snapshot 是開始晚,同步階段做的慢,還是異步階段做的慢。然后再按照情況繼續(xù)進一步排查問題。
8. Checkpoint 慢
Checkpoint 慢的情況如下:比如 Checkpoint interval 1 分鐘,超時 10 分鐘,Checkpoint 經(jīng)常需要做 9 分鐘(我們希望 1 分鐘左右就能夠做完),而且我們預(yù)期 state size 不是非常大。
對于 Checkpoint 慢的情況,我們可以按照下面的順序逐一檢查。
-
Source Trigger Checkpoint 慢
這個一般發(fā)生較少,但是也有可能,因為 source 做 snapshot 并往下游發(fā)送 barrier 的時候,需要搶鎖(Flink1.10 開始,用 mailBox 的方式替代當(dāng)前搶鎖的方式,詳情參考(https://issues.apache.org/jira/browse/FLINK-12477)。如果一直搶不到鎖的話,則可能導(dǎo)致 Checkpoint 一直得不到機會進行。如果在 Source 所在的 taskmanager.log 中找不到開始做 Checkpoint 的 log,則可以考慮是否屬于這種情況,可以通過 jstack 進行進一步確認(rèn)鎖的持有情況。 -
使用增量 Checkpoint
現(xiàn)在 Flink 中 Checkpoint 有兩種模式,全量 Checkpoint 和 增量 Checkpoint , 其中全量 Checkpoint 會把當(dāng)前的 state 全部備份一次到持久化存儲,而增量 Checkpoint,則只備份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上傳的內(nèi)容會相對更少,在速度上會有更大的優(yōu)勢。
現(xiàn)在 Flink 中僅在 RocksDBStateBackend 中支持增量 Checkpoint,如果已經(jīng)使用 RocksDBStateBackend,可以通過開啟增量 Checkpoint 來加速。 -
作業(yè)存在反壓或者數(shù)據(jù)傾斜
task 僅在接受到所有的 barrier 之后才會進行 snapshot,如果作業(yè)存在反壓,或者有數(shù)據(jù)傾斜,則會導(dǎo)致全部的 channel 或者某些 channel 的 barrier 發(fā)送慢,從而整體影響 Checkpoint 的時間。 -
Barrier 對齊慢
從前面我們知道 Checkpoint 在 task 端分為 barrier 對齊(收齊所有上游發(fā)送過來的 barrier),然后開始同步階段,再做異步階段。如果 barrier 一直對不齊的話,就不會開始做 snapshot。
barrier 對齊之后會有如下日志打?。?/p>Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
如果 taskmanager.log 中沒有這個日志,則表示 barrier 一直沒有對齊,接下來我們需要了解哪些上游的 barrier 沒有發(fā)送下來, 如果你使用 At Least Once 的話,可以觀察到下面的日志:
Received barrier for checkpoint 96508 from channel 5
表示該 task 收到了 channel 5 來的 barrier,然后看對應(yīng) Checkpoint,再查看還剩哪些上游的 barrier 沒有接受到。
-
主線程太忙,導(dǎo)致沒機會做 snapshot
在 task 端,所有的處理都是單線程的,數(shù)據(jù)處理和 barrier 處理都由主線程處理,如果主線程在處理太慢(比如使用 RocksDBBackend ,state 操作慢導(dǎo)致整體處理慢),導(dǎo)致 barrier 處理的慢,也會影響整體 Checkpoint 的進度,可以通過火焰圖分析。 -
同步階段做的慢
同步階段一般不會太慢,但是如果我們通過日志發(fā)現(xiàn)同步階段比較慢的話,對于非 RocksDBBackend 我們可以考慮查看是否開啟了異步 snapshot ,如果開啟了異步 snapshot,還是慢,需要看整個 JVM 在干嘛,也可以使用火焰圖分析。對于 RocksDBBackend 來說,我們可以用 iostate 查看磁盤的壓力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的時間總共開銷多少。
RocksDB 開始 snapshot 的日志如下:2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729
snapshot 結(jié)束的日志如下:
2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good
-
異步階段做的慢
對于異步階段來說,tm 端主要將 state 備份到持久化存儲上,對于非 RocksDBBackend 來說,主要瓶頸來自于網(wǎng)絡(luò),這個階段可以考慮觀察網(wǎng)絡(luò)的 metric,或者對應(yīng)機器上能夠觀察到網(wǎng)絡(luò)流量的情況(比如 iftop)。
對于 RocksDB 來說,則需要從本地讀取文件,寫入到遠程的持久化存儲上,所以不僅需要考慮網(wǎng)絡(luò)的瓶頸,還需要考慮本地磁盤的性能。另外對于 RocksDBBackend 來說,如果覺得網(wǎng)絡(luò)流量不是瓶頸,但是上傳比較慢的話,還可以嘗試考慮開啟多線程上傳功能(Flink 1.13 開始,state.backend.rocksdb.checkpoint.transfer.thread.num 默認(rèn)值是 4) 。
9. Kafka 動態(tài)發(fā)現(xiàn)分區(qū)
當(dāng) FlinkKafkaConsumer 初始化時,每個 subtask 會訂閱一批 partition,但是在 Flink 任務(wù)運行過程中,如果被訂閱的 topic 創(chuàng)建了新的 partition,F(xiàn)linkKafkaConsumer 如何實現(xiàn)動態(tài)發(fā)現(xiàn)新創(chuàng)建的 partition 并消費呢?
在使用 FlinkKafkaConsumer 時,可以開啟 partition 的動態(tài)發(fā)現(xiàn)。通過 Properties 指定參數(shù)開啟(單位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
該參數(shù)表示間隔多久檢測一次是否有新創(chuàng)建的 partition。默認(rèn)值是 Long 的最小值,表示不開啟,大于 0 表示開啟。開啟時會啟動一個線程根據(jù)傳入的 interval 定期獲取 Kafka 最新的元數(shù)據(jù),新 partition 對應(yīng)的那一個 subtask 會自動發(fā)現(xiàn)并從 earliest 位置開始消費,新創(chuàng)建的 partition 對其他 subtask 并不會產(chǎn)生影響。
代碼如下所示:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVA_L_MILLIS, 30 * 1000 + "");
10. Watermark 不更新
如果數(shù)據(jù)源中的某一個分區(qū)/分片在一段時間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會獲得任何新數(shù)據(jù)去生成 watermark。我們稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時候就會出現(xiàn)問題。比如 Kafka 的Topic 中,由于某些原因,造成個別 Partition 一直沒有新的數(shù)據(jù)。由于下游算子 watermark 的計算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會發(fā)生變化,導(dǎo)致窗口、定時器等不會被觸發(fā)。
為了解決這個問題,可以使用 WatermarkStrategy 來檢測空閑輸入并將其標(biāo)記為空閑狀態(tài)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>("flinktest", new SimpleStringSchema(), properties);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))
);
env.addSource(kafkaSourceFunction);
11. 依賴沖突
ClassNotFoundException
NoSuchMethodError
IncompatibleClassChangeError
...
一般都是因為用戶依賴第三方包的版本與 Flink 框架依賴的版本有沖突導(dǎo)致。根據(jù)報錯信息中的類名,定位到?jīng)_突的 jar 包,idea 可以借助 maven helper 插件查找沖突的有哪些。打包插件建議使用 maven-shade-plugin。
12. 超出文件描述符限制
java.io.IOException: Too many open files
首先檢查 Linux 系統(tǒng) ulimit -n 的文件描述符限制,再注意檢查程序內(nèi)是否有資源(如各種連接池的連接)未及時釋放。值得注意的是,低版本 Flink 使用 RocksDB 狀態(tài)后端也有可能 會拋出這個異常,此時需修改 flink-conf.yaml 中的 state.backend.rocksdb.files.open 參數(shù),如果不限制,可以改為-1 (1.13 默認(rèn)就是-1) 。
13. 臟數(shù)據(jù)導(dǎo)致數(shù)據(jù)轉(zhuǎn)發(fā)失敗
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
該異常幾乎都是由于程序業(yè)務(wù)邏輯有誤,或者數(shù)據(jù)流里存在未處理好的臟數(shù)據(jù)導(dǎo)致的,繼續(xù)向下追溯異常棧一般就可以看到具體的出錯原因,比較常見的如 POJO 內(nèi)有空字段,或者抽取事件時間的時間戳為 null 等。
14. 通訊超時
akka.pattern.AskTimeoutException: Ask timed out on [Actor [akka://...]] after [10000 ms]
Akka 超時導(dǎo)致,一般有兩種原因:一是集群負(fù)載比較大或者網(wǎng)絡(luò)比較擁塞,二是業(yè)務(wù)邏輯同步調(diào)用耗時的外部服務(wù)。如果負(fù)載或網(wǎng)絡(luò)問題無法徹底緩解,需考慮調(diào)大 akka.ask.timeout 參數(shù)的值(默認(rèn)只有 10 秒);另外,調(diào)用外部服務(wù)時盡量異步操作(Async I/O)。
15. 提交的flink程序重啟失敗
15.1. 錯誤日志
從圖中可以看出,flink程序在自動重啟時,需要尋找 /tmp 目錄下面的一些文件。但是由于 linux 系統(tǒng)的 /tmp 目錄會被很多程序以及操作系統(tǒng)本身用到,所以很難避免文件的誤刪除操作。
這個錯誤一般只會出現(xiàn)在使用 flink standalone 集群時發(fā)生。
15.2. 解決方案
在flink的 flink-conf.yaml 文件中,有個配置項叫 io.tmp.dirs ,該配置用于決定程序運行過程中一些臨時文件保存的目錄。建議將該目錄配置為 flink 專用目錄。
16. flink集群無法通過 stop-cluster.sh 腳本停止
16.1. 錯誤現(xiàn)象
?
通過腳本停止集群,發(fā)現(xiàn)無法在對應(yīng)的機器上找到對應(yīng)的flink服務(wù),出現(xiàn)在 flink standalone 模式集群。
16.2. 解決方案
在flink的安裝目錄下的 /bin 目錄下有個 config.sh 腳本文件,里面有一項配置用來配置 flink 服務(wù)的 pid 文件目錄,配置名稱為: DEFAULT_ENV_PID_DIR ,默認(rèn)值為 /tmp 。由于 linux 系統(tǒng)的 /tmp 目錄會被很多程序以及操作系統(tǒng)本身用到,所以很難避免文件的誤刪除操作。出現(xiàn)上述日志就是因為 pid 文件被刪除,導(dǎo)致 flink 找不到機器上的進程 pid 編號所致。因此我們需要修改該默認(rèn)配置為一個 flink 專用目錄。
17. flink sql on hive 太多max函數(shù)
flink版本:1.11.1
運行了一個sql語句,查詢 hive 表的數(shù)據(jù),有 group by
,然后對很多字段取最大值,示例代碼如下:
select user_id, max(column1), max(column2), max(column3)
from test.test
group by user_id
;
select 后面的 max 字段有 150 多個,然后運行代碼報錯如下:
java.lang.RuntimeException: Could not instantiate generated class 'LocalSortAggregateWithKeys$1975'
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.\<init\>(OperatorChain.java:155)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkRuntimeException:org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
... 14 more
Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache\$Segment.get(LocalCache.java:2203)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache\$LocalManualCache.get(LocalCache.java:4739)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 16 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at org.apache.flink.table.runtime.generated.CompileUtils.lambda\$compile\$1(CompileUtils.java:66)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache\$LocalManualCache\$1.load(LocalCache.java:4742)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache\$LoadingValueReference.loadFuture(LocalCache.java:3527)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache\$Segment.loadSync(LocalCache.java:2319)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache\$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache\$Segment.get(LocalCache.java:2197)
... 19 more
Caused by: java.lang.StackOverflowError
at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:700)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:478)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
主要錯誤是:StackOverflowError,棧溢出。這個錯誤意思是說,方法嵌套太多。
之后試了一下,發(fā)現(xiàn)是一個 sql 里面的聚合函數(shù)不能寫太多,最大大概是 120 左右。建議聚合函數(shù)個數(shù)超過 100 個,就寫兩個 sql ,然后再把兩個 sql 的結(jié)果進行合并。
18. 調(diào)用太多次get_json_value函數(shù)
將處理完的數(shù)據(jù)寫入 pulsar 主題,主題字段特別多,超過 200 個,通過調(diào)用 get_json_value (類似于 hive 中的 get_json_object)函數(shù),將處理完的結(jié)果數(shù)據(jù)(為 json 字符串)的內(nèi)容提取出來,然后發(fā)現(xiàn)處理速度很慢。
解決方式就是,結(jié)果就保存一個字段,將 json 字符串輸入,下游處理時再通過 get_json_value 函數(shù)獲取需要的字段值,或者是在自定義 UDF 中獲取。文章來源:http://www.zghlxwxcb.cn/news/detail-404488.html
19. 其他
Flink on YARN(下):常見問題與排查思路文章來源地址http://www.zghlxwxcb.cn/news/detail-404488.html
到了這里,關(guān)于flink常見故障排除的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!