問題描述
由于 flink / kafka 的版本不斷更新,創(chuàng)建項目的時候就應(yīng)當考慮清楚這幾個依賴庫的版本問題,盡可能地與實際場景保持一致,比如服務(wù)器上部署的 kafka 是哪個版本,flink 是哪個版本,從而確定我們需要開發(fā)的是哪個版本,并且在真正的開發(fā)工作開始之前,應(yīng)當先測試一下保證 kafka 的版本 、 flink 的版本一致,至少大版本一致,不存在沖突問題,不要為以后的部署埋坑。
解決方案
步驟 1 確定 flink / scala / flink-connect-kafka 的版本
比如 flink 選擇的是 1.12.7
這個版本,我們前去 maven 倉庫查看 flink-connect-kafka 的版本。首先訪問 鏈接1
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka 找到對應(yīng)的 1.12.7
這個版本,也就是根據(jù) flink 的版本去尋找 flink-connect-kafka 的版本,記作 鏈接2
即 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.12.7。
進入 鏈接2
對應(yīng)的地址后,可以發(fā)現(xiàn)提供的 maven 地址如下:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.7</version>
</dependency>
這個地方已經(jīng)明確地指出, kafka_2.12
指的是對應(yīng)的是使用 scala
的版本是 2.12
編寫的 kakfa ,也就是對應(yīng)的是 scala 2.12 的版本。為了確保無誤,請確保安裝的 kafka
也是這個版本。
類似地
,如果是其他版本的 flink
也要找到對應(yīng)的flink-connector-kafka
版本,確保 kafka 的版本的 scala 是一致的。
pom.xml 案例
為了規(guī)范,我們把版本號寫在前面,然后再引用這些依賴。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.smileyan.demo</groupId>
<artifactId>flink-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<java.version>8</java.version>
<!-- flink 的版本 -->
<flink.version>1.12.7</flink.version>
<!-- scala 的版本(也就是 kafka 的源碼的版本)-->
<scala.binary.version>2.12</scala.binary.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<slf4j.version>2.0.7</slf4j.version>
</properties>
<profiles>
<profile>
<id>local</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<flink.scope>compile</flink.scope>
</properties>
</profile>
<profile>
<id>prod</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<flink.scope>provided</flink.scope>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
擴展
根據(jù)實際需要,調(diào)整 flink 的版本以及 scala 的版本。一定要確保最終我們在 maven 倉庫中能找到對應(yīng)的版本。
JAVA 代碼示例
再次強調(diào)
:一定要找到對應(yīng)的版本的示例。新版本的 flink 不再支持 new FlinkKafkaProducer 以及 new FlinkKafkaConsumer 這類操作,所以一定要結(jié)合實際情況進行調(diào)整。
flink version <= 1.13
消費者
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
生產(chǎn)者
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord<>(
"my-topic", // target topic
element.getBytes(StandardCharsets.UTF_8)); // record contents
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
serializationSchema, // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
flink version > 1.13
消費者
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
生產(chǎn)者
DataStream<String> stream = ...
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
)
.build();
stream.sinkTo(sink);
依賴<scope> 設(shè)置為 provided
flink 的 job 開發(fā)完成以后,需要打包上傳到服務(wù)器端運行,實際上 flink-java
flink-stream
等等依賴包在服務(wù)器端的 flink 都提供了這些依賴,所以打包的時候可以去除這些依賴,以減小打包后的 jar 文件的大小。
所以如上面的 pom.xml 文件所示,我們把很多依賴的 scope 設(shè)置為 provided
,但是帶來的新問題就是運行 flink job 的main方法時會出現(xiàn)報錯提示這些依賴找不到:
這時我們需要進行配置,步驟如下:
再次運行就不會報錯了。
參考鏈接
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/
- https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
總結(jié)
flink-kafka 的項目創(chuàng)建本身應(yīng)當是一件很容易的事情,但是為了避免為以后的開發(fā)埋雷,一定要規(guī)范地編寫依賴,并結(jié)合實際情況對版本進行調(diào)整,并非一切都應(yīng)當用最新版本的。文章來源:http://www.zghlxwxcb.cn/news/detail-465339.html
Smileyan
2023-03-25 00:29文章來源地址http://www.zghlxwxcb.cn/news/detail-465339.html
到了這里,關(guān)于flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!