寫在前面
書接上文 【Flink實時數倉】需求一:用戶屬性維表處理-Flink CDC 連接 MySQL 至 Hbase 實驗及報錯分析http://t.csdn.cn/bk96r
我隔了一天跑Hbase中的數據,發(fā)現(xiàn)kafka報錯,但是kafka在這個代碼段中并沒有使用,原因就是我在今天的其他項目中添加的kafka依賴導致了沖突。
錯誤全文
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
[WARN ] 2023-07-23 12:48:34,083(0) --> [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation.find(WebMonitorUtils.java:82): Log file environment variable 'log.file' is not set.
[WARN ] 2023-07-23 12:48:34,088(5) --> [main] org.apache.flink.runtime.webmonitor.WebMonitorUtils$LogFileLocation.find(WebMonitorUtils.java:88): JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'.
[WARN ] 2023-07-23 12:48:35,781(1698) --> [Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:154): The operator name Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) exceeded the 80 characters length limit and was truncated.
[WARN ] 2023-07-23 12:48:36,481(2398) --> [Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.kafka.connect.runtime.WorkerConfig.logPluginPathConfigProviderWarning(WorkerConfig.java:420): Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
[ERROR] 2023-07-23 12:48:36,487(2404) --> [debezium-engine] com.ververica.cdc.debezium.internal.Handover.reportError(Handover.java:147): Reporting error:
java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 7 more
[WARN ] 2023-07-23 12:48:36,499(2416) --> [Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0] org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1074): Source: TableSourceScan(table=[[default_catalog, default_database, ums_member]], fields=[id, username, phone, status, create_time, gender, birthday, city, job, source_type]) -> NotNullEnforcer(fields=[id]) -> Sink: Collect table sink (1/1)#0 (472d9a4f02e261cfd2f115da78d97e03) switched from RUNNING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 7 more
[WARN ] 2023-07-23 12:48:36,581(2498) --> [main] org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:215): Failed to get job status so we assume that the job has terminated. Some data might be lost.
java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:852)
at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:752)
at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:705)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:90)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:203)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:117)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
[WARN ] 2023-07-23 12:48:36,582(2499) --> [main] org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:215): Failed to get job status so we assume that the job has terminated. Some data might be lost.
java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:852)
at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:752)
at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:705)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:90)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:203)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.cancelJob(CollectResultFetcher.java:225)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.close(CollectResultFetcher.java:150)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:108)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
at demo.UserInfo2Hbase.main(UserInfo2Hbase.java:93)
Caused by: java.io.IOException: Failed to fetch job execution result
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
... 5 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils
at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.start(FlinkOffsetBackingStore.java:152)
at com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore.configure(FlinkOffsetBackingStore.java:71)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:690)
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:188)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.ThreadUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 7 more
Process finished with exit code 1
Flink測試代碼
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.executeSql("CREATE TABLE ums_member (\n" +
" id BIGINT, \n" +
" username STRING, \n" +
" phone STRING, \n" +
" status int, \n" +
" create_time timestamp(3), \n" +
" gender \t\tint, \n" +
" birthday\tdate, \n" +
" city \t\tSTRING, \n" +
" job \t\tSTRING , \n" +
" source_type INT , \n" +
" PRIMARY KEY(id) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'hadoop10',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '0000',\n" +
" 'database-name' = 'db1',\n" +
//" 'scan.startup.mode' = 'latest-offset',\n" +
" 'scan.incremental.snapshot.enabled' = 'false',\n" +
" 'table-name' = 'ums_member')").print();
tenv.executeSql("select * from ums_member").print();
姐姐方案
注釋掉kafka依賴,此時我又重新跑,仍然報錯。文章來源:http://www.zghlxwxcb.cn/news/detail-762465.html
經過我一頓全網搜索,解決方法五花八門,八仙過海。
我選擇了重啟idea2020,隨后解決。
數據成功回到了hbase。文章來源地址http://www.zghlxwxcb.cn/news/detail-762465.html
到了這里,關于Flink連接Hbase時的kafka報錯:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!