目錄
1、添加POM依賴
2、API使用說(shuō)明
3、序列化器
3.1 使用預(yù)定義的序列化器
3.2 使用自定義的序列化器
4、容錯(cuò)保證級(jí)別
4.1?至少一次 的配置
4.2?精確一次 的配置
5、這是一個(gè)完整的入門(mén)案例
1、添加POM依賴
Apache Flink 集成了通用的 Kafka 連接器,使用時(shí)需要根據(jù)生產(chǎn)環(huán)境的版本引入相應(yīng)的依賴
<!-- 引入 kafka連接器依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
2、API使用說(shuō)明
KafkaSink
?可將數(shù)據(jù)流寫(xiě)入一個(gè)或多個(gè) Kafka topic。
官網(wǎng)鏈接:官網(wǎng)鏈接
DataStream<String> stream = ...;
KafkaSink<String> sink = KafkaSink.<String>builder() // 泛型為 輸入輸入的類(lèi)型
// TODO 必填項(xiàng):配置 kafka 的地址和端口
.setBootstrapServers(brokers)
// TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// TODO 必填項(xiàng):配置容錯(cuò)保證級(jí)別 精準(zhǔn)一次、至少一次、不做任何保證
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
3、序列化器
序列化器的作用是將flink數(shù)據(jù)轉(zhuǎn)換成 kafka的ProducerRecord
3.1 使用預(yù)定義的序列化器
功能:將?DataStream 數(shù)據(jù)轉(zhuǎn)換為 Kafka消息中的value,key為默認(rèn)值null,timestamp為默認(rèn)值
// 初始化 KafkaSink 實(shí)例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// TODO 必填項(xiàng):配置 kafka 的地址和端口
.setBootstrapServers("worker01:9092")
// TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("20230912")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
3.2 使用自定義的序列化器
功能:可以對(duì) kafka消息的key、value、partition、timestamp進(jìn)行賦值
/**
* 如果要指定寫(xiě)入kafka的key,可以自定義序列化器:
* 1、實(shí)現(xiàn) 一個(gè)接口,重寫(xiě) 序列化 方法
* 2、指定key,轉(zhuǎn)成 字節(jié)數(shù)組
* 3、指定value,轉(zhuǎn)成 字節(jié)數(shù)組
* 4、返回一個(gè) ProducerRecord對(duì)象,把key、value放進(jìn)去
*/
// 初始化 KafkaSink 實(shí)例 (自定義 KafkaRecordSerializationSchema 實(shí)例)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// TODO 必填項(xiàng):配置 kafka 的地址和端口
.setBootstrapServers("worker01:9092")
// TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
.setRecordSerializer(
new KafkaRecordSerializationSchema<String>() {
@Nullable
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
Long currTimestamp = System.currentTimeMillis();
Integer partition = 0;
return new ProducerRecord<>("20230913", partition, currTimestamp, key, value);
}
}
)
.build();
4、容錯(cuò)保證級(jí)別
KafkaSink
?總共支持三種不同的語(yǔ)義保證(DeliveryGuarantee
)
-
DeliveryGuarantee.NONE
? ?不提供任何保證- 消息有可能會(huì)因 Kafka broker 的原因發(fā)生丟失或因 Flink 的故障發(fā)生重復(fù)
-
DeliveryGuarantee.AT_LEAST_ONCE?
?至少一次- sink 在 checkpoint 時(shí)會(huì)等待 Kafka 緩沖區(qū)中的數(shù)據(jù)全部被 Kafka producer 確認(rèn)。
- 消息不會(huì)因 Kafka broker 端發(fā)生的事件而丟失,但可能會(huì)在 Flink 重啟時(shí)重復(fù),因?yàn)?Flink 會(huì)重新處理舊數(shù)據(jù)。
-
DeliveryGuarantee.EXACTLY_ONCE 精確
一次- 該模式下,Kafka sink 會(huì)將所有數(shù)據(jù)通過(guò)在 checkpoint 時(shí)提交的事務(wù)寫(xiě)入。
- 因此,如果 consumer 只讀取已提交的數(shù)據(jù)(參見(jiàn) Kafka consumer 配置?
isolation.level
),在 Flink 發(fā)生重啟時(shí)不會(huì)發(fā)生數(shù)據(jù)重復(fù)。 - 然而這會(huì)使數(shù)據(jù)在 checkpoint 完成時(shí)才會(huì)可見(jiàn),因此請(qǐng)按需調(diào)整 checkpoint 的間隔。
- 請(qǐng)確認(rèn)事務(wù) ID 的前綴(transactionIdPrefix)對(duì)不同的應(yīng)用是唯一的,以保證不同作業(yè)的事務(wù) 不會(huì)互相影響!此外,強(qiáng)烈建議將 Kafka 的事務(wù)超時(shí)時(shí)間調(diào)整至遠(yuǎn)大于 checkpoint 最大間隔 + 最大重啟時(shí)間,否則 Kafka 對(duì)未提交事務(wù)的過(guò)期處理會(huì)導(dǎo)致數(shù)據(jù)丟失。
4.1?至少一次 的配置
DataStream<String> stream = ...;
// 初始化 KafkaSink 實(shí)例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// TODO 必填項(xiàng):配置 kafka 的地址和端口
.setBootstrapServers("worker01:9092")
// TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("20230912")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// TODO 必填項(xiàng):配置容災(zāi)保證級(jí)別設(shè)置為 至少一次
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
4.2?精確一次 的配置
// 如果是精準(zhǔn)一次,必須開(kāi)啟checkpoint
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataStream<String> stream = ...;
KafkaSink<String> sink = KafkaSink.<String>builder() // 泛型為 輸入輸入的類(lèi)型
// TODO 必填項(xiàng):配置 kafka 的地址和端口
.setBootstrapServers(brokers)
// TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// TODO 必填項(xiàng):配置容災(zāi)保證級(jí)別設(shè)置為 精準(zhǔn)一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)的前綴
.setTransactionalIdPrefix("flink-")
// 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)超時(shí)時(shí)間: 大于checkpoint間隔,小于 max 15分鐘
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
.build();
stream.sinkTo(sink);
5、這是一個(gè)完整的入門(mén)案例
需求:Flink實(shí)時(shí)讀取?socket數(shù)據(jù)源,將讀取到的數(shù)據(jù)寫(xiě)入到Kafka (要保證不丟失,不重復(fù))
開(kāi)發(fā)語(yǔ)言:java1.8文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-709426.html
flink版本:flink1.17.0文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-709426.html
package com.baidu.datastream.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
// TODO flink 數(shù)據(jù)輸出到kafka
public class SinkKafka {
public static void main(String[] args) throws Exception {
// 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 如果是精準(zhǔn)一次,必須開(kāi)啟checkpoint
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
// 2.指定數(shù)據(jù)源
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
// 3.初始化 KafkaSink 實(shí)例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// TODO 必填項(xiàng):配置 kafka 的地址和端口
.setBootstrapServers("worker01:9092")
// TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("20230912")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// TODO 必填項(xiàng):配置容災(zāi)保證級(jí)別設(shè)置為 精準(zhǔn)一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)的前綴
.setTransactionalIdPrefix("flink-")
// 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)超時(shí)時(shí)間: 大于checkpoint間隔,小于 max 15分鐘
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
.build();
streamSource.sinkTo(kafkaSink);
// 3.觸發(fā)程序執(zhí)行
env.execute();
}
}
到了這里,關(guān)于6.2、Flink數(shù)據(jù)寫(xiě)入到Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!