国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題

這篇具有很好參考價值的文章主要介紹了flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

問題描述

由于 flink / kafka 的版本不斷更新,創(chuàng)建項目的時候就應(yīng)當考慮清楚這幾個依賴庫的版本問題,盡可能地與實際場景保持一致,比如服務(wù)器上部署的 kafka 是哪個版本,flink 是哪個版本,從而確定我們需要開發(fā)的是哪個版本,并且在真正的開發(fā)工作開始之前,應(yīng)當先測試一下保證 kafka 的版本 、 flink 的版本一致,至少大版本一致,不存在沖突問題,不要為以后的部署埋坑。

解決方案

步驟 1 確定 flink / scala / flink-connect-kafka 的版本

比如 flink 選擇的是 1.12.7 這個版本,我們前去 maven 倉庫查看 flink-connect-kafka 的版本。首先訪問 鏈接1 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka 找到對應(yīng)的 1.12.7 這個版本,也就是根據(jù) flink 的版本去尋找 flink-connect-kafka 的版本,記作 鏈接2 即 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.12.7。

進入 鏈接2 對應(yīng)的地址后,可以發(fā)現(xiàn)提供的 maven 地址如下:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.12.7</version>
</dependency>

這個地方已經(jīng)明確地指出, kafka_2.12 指的是對應(yīng)的是使用 scala 的版本是 2.12 編寫的 kakfa ,也就是對應(yīng)的是 scala 2.12 的版本。為了確保無誤,請確保安裝的 kafka 也是這個版本。

類似地,如果是其他版本的 flink 也要找到對應(yīng)的flink-connector-kafka 版本,確保 kafka 的版本的 scala 是一致的。

pom.xml 案例

為了規(guī)范,我們把版本號寫在前面,然后再引用這些依賴。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.smileyan.demo</groupId>
    <artifactId>flink-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <properties>
        <java.version>8</java.version>
        <!-- flink 的版本 -->
        <flink.version>1.12.7</flink.version>
        <!-- scala 的版本(也就是 kafka 的源碼的版本)-->
        <scala.binary.version>2.12</scala.binary.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
        <slf4j.version>2.0.7</slf4j.version>
    </properties>
    <profiles>
        <profile>
            <id>local</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <flink.scope>compile</flink.scope>
            </properties>
        </profile>
        <profile>
            <id>prod</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <properties>
                <flink.scope>provided</flink.scope>
            </properties>
        </profile>
    </profiles>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-simple</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>${maven-shade-plugin.version}</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

擴展

根據(jù)實際需要,調(diào)整 flink 的版本以及 scala 的版本。一定要確保最終我們在 maven 倉庫中能找到對應(yīng)的版本。

JAVA 代碼示例

再次強調(diào):一定要找到對應(yīng)的版本的示例。新版本的 flink 不再支持 new FlinkKafkaProducer 以及 new FlinkKafkaConsumer 這類操作,所以一定要結(jié)合實際情況進行調(diào)整。

flink version <= 1.13

消費者

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream<String> stream = env.addSource(myConsumer);

生產(chǎn)者

DataStream<String> stream = ...

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
            return new ProducerRecord<>(
                    "my-topic", // target topic
                    element.getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };

FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
        "my-topic",             // target topic
        serializationSchema,    // serialization schema
        properties,             // producer config
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

stream.addSink(myProducer);

flink version > 1.13

消費者

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

生產(chǎn)者

DataStream<String> stream = ...
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build()
        )
        .build();
        
stream.sinkTo(sink);

依賴<scope> 設(shè)置為 provided

flink 的 job 開發(fā)完成以后,需要打包上傳到服務(wù)器端運行,實際上 flink-java flink-stream 等等依賴包在服務(wù)器端的 flink 都提供了這些依賴,所以打包的時候可以去除這些依賴,以減小打包后的 jar 文件的大小。

所以如上面的 pom.xml 文件所示,我們把很多依賴的 scope 設(shè)置為 provided,但是帶來的新問題就是運行 flink job 的main方法時會出現(xiàn)報錯提示這些依賴找不到:

flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題
這時我們需要進行配置,步驟如下:

flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題
flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題
flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題
再次運行就不會報錯了。

參考鏈接

  • https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
  • https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

總結(jié)

flink-kafka 的項目創(chuàng)建本身應(yīng)當是一件很容易的事情,但是為了避免為以后的開發(fā)埋雷,一定要規(guī)范地編寫依賴,并結(jié)合實際情況對版本進行調(diào)整,并非一切都應(yīng)當用最新版本的。

Smileyan
2023-03-25 00:29文章來源地址http://www.zghlxwxcb.cn/news/detail-465339.html

到了這里,關(guān)于flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 關(guān)于Qt程序打包后運行庫依賴的常見問題分析及解決方法

    關(guān)于Qt程序打包后運行庫依賴的常見問題分析及解決方法

    目錄 一. 大致如下常見問題: (1)找不到程序所依賴的Qt庫 version `Qt_5\\\' not found (required by (2)Could not Load the Qt platform plugin \\\"xcb\\\" in \\\"\\\" even though it was found (3)打包到在不同的linux系統(tǒng)下,或者打包到高版本的相同系統(tǒng)下,運行程序時,直接提示段錯誤即segmentation fault,或者I

    2023年04月17日
    瀏覽(25)
  • Flink|《Flink 官方文檔 - 部署 - 內(nèi)存配置 - 調(diào)優(yōu)指南 & 常見問題》學習筆記

    學習文檔: 《Flink 官方文檔 - 部署 - 內(nèi)存配置 - 調(diào)優(yōu)指南》 《Flink 官方文檔 - 部署 - 內(nèi)存配置 - 常見問題》 學習筆記如下: 獨立部署模式(Standalone Deployment)下的內(nèi)存配置 通常無需配置進程總內(nèi)存,因為不管是 Flink 還是部署環(huán)境都不會對 JVM 開銷進行限制,它只與機器的

    2024年02月19日
    瀏覽(24)
  • Flink本地集群部署啟動&常見問題的解決方法

    Flink本地集群部署啟動&常見問題的解決方法

    [zhangflink@9wmwtivvjuibcd2e software]$ vim flink/conf/flink-conf.yaml [zhangflink@9wmwtivvjuibcd2e software]$ vim flink/conf/workers [zhangflink@9wmwtivvjuibcd2e software]$ xsync flink/conf/ 啟動集群在jobmanager那臺機器啟動 [zhangflink@9wmwtivvjuibcd2e-0001 flink]$ bin/start-cluster.sh 啟動成功jobmanager會出現(xiàn)如下進程 啟動成功taskm

    2024年02月02日
    瀏覽(28)
  • 八、Kafka時間輪與常見問題

    八、Kafka時間輪與常見問題

    Kafka中存在大量的延時操作。 1、發(fā)送消息-超時+重試機制 2、ACKS 用于指定分區(qū)中必須要有多少副本收到這條消息,生產(chǎn)者才認為寫入成功(延時 等) Kafka并沒有使用JDK自帶的Timer或者DelayQueue來實現(xiàn)延遲的功能,而是基于時間輪自定義了一個用于實現(xiàn)延遲功能的定時器(Syst

    2024年02月15日
    瀏覽(22)
  • Linux部署Kafka及常見問題記錄

    Linux部署Kafka及常見問題記錄

    監(jiān)控 Metrics 網(wǎng)站活動追蹤 Website Activity Tracking 日志收集 Log Aggregation 流處理 Stream Processing 事件溯源 Event Sourcing 提交日志 Commit Log Broker 和AMQP里協(xié)議的概念一樣, 就是消息中間件所在的服務(wù)器 Topic(主題) 每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上

    2024年02月02日
    瀏覽(22)
  • Streampark集成Cloudera Flink、ldap、告警,以及部署常見問題

    Streampark集成Cloudera Flink、ldap、告警,以及部署常見問題

    集成背景 我們當前集群使用的是Cloudera CDP,F(xiàn)link版本為Cloudera Version 1.14,整體Flink安裝目錄以及配置文件結(jié)構(gòu)與社區(qū)版本有較大出入。直接根據(jù)Streampark官方文檔進行部署,將無法配置Flink Home,以及后續(xù)整體Flink任務(wù)提交到集群中,因此需要進行針對化適配集成,在滿足使用需

    2023年04月09日
    瀏覽(28)
  • 大數(shù)據(jù)_面試_ETL組件常見問題_spark&flink

    問題列表 回答 spark與flink的主要區(qū)別 flink cdc如何確保冪等與一致性 Flink SQL CDC 實踐以及一致性分析-阿里云開發(fā)者社區(qū) spark 3.0 AQE動態(tài)優(yōu)化 hbase memorystore blockcache sparksql如何調(diào)優(yōu) 通過webui定位那個表以及jobid,jobid找對應(yīng)的執(zhí)行計劃 hdfs的常見的壓縮算法 hbase的數(shù)據(jù)傾斜 spark數(shù)據(jù)處

    2024年02月16日
    瀏覽(25)
  • Android開發(fā)常見問題

    Android開發(fā)常見問題

    看下當前工程目錄中是否存在gradle目錄,如果不存在,創(chuàng)建一個新的工程,拷貝新工程的gradle文件夾到當前工程。gradle中有兩個文件。 解決方法: 1.找到c盤下的gradle.properties文件 2.將代理注釋 3.在gradle中設(shè)置不使用代理,重新加載,問題解決 gradle的版本太老了。將gradle升級

    2024年02月13日
    瀏覽(25)
  • JAVA開發(fā)中常見問題

    JAVA開發(fā)中常見問題

    目錄 1.深淺克隆問題 2.Mysql中可以代替左模糊或全查詢的函數(shù)方法 3.開發(fā)時需注意,使用String類的equals()方法時,原則上需要左邊的變量不能為null值,避免程序執(zhí)行時出現(xiàn)空指針報錯 4.Mysql Update的高效應(yīng)用 5.Mysql Insert 的高效應(yīng)用 6.在try-catch-finally代碼塊中return或者throw Exception時需

    2024年02月05日
    瀏覽(29)
  • Kafka如何保證消息的消費順序【全局有序、局部有序】、Kafka如何保證消息不被重復(fù)消費、Kafka為什么這么快?【重點】、Kafka常見問題匯總【史上最全】

    Kafka如何保證消息的消費順序【全局有序、局部有序】、Kafka如何保證消息不被重復(fù)消費、Kafka為什么這么快?【重點】、Kafka常見問題匯總【史上最全】

    目錄 Kafka消息生產(chǎn) 一個Topic對應(yīng)一個Partition 一個Topic對應(yīng)多個Partition Kafka消息的順序性保證(Producer、Consumer) 全局有序 局部有序? max.in.flight.requests.per.connection參數(shù)詳解 Kafka的多副本機制 Kafka的follower從leader同步數(shù)據(jù)的流程 Kafka的follower為什么不能用于消息消費 Kafka的多分區(qū)

    2024年04月11日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包