flink高級(jí)版本后,消費(fèi)kafka數(shù)據(jù)一種是Datastream 一種之tableApi。
上官網(wǎng)?Kafka | Apache Flink
Kafka Source
引入依賴 flink和kafka的連接器,里面內(nèi)置了kafka-client
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.2</version>
</dependency>
使用方法
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
很簡單一目了然。
topic和partition
多個(gè)topic
KafkaSource.builder().setTopics("topic-a", "topic-b");
正則匹配多個(gè)topic
KafkaSource.builder().setTopicPattern("topic.*");
指定分區(qū)
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a"
new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);
?反序列化
import org.apache.kafka.common.serialization.StringDeserializer;
KafkaSource.<String>builder()
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
其實(shí)就是實(shí)現(xiàn)接口?DeserializationSchema 的deserialize()方法 把byte轉(zhuǎn)為你想要的類型。
起始消費(fèi)位點(diǎn)
KafkaSource.builder()
// 從消費(fèi)組提交的位點(diǎn)開始消費(fèi),不指定位點(diǎn)重置策略
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// 從消費(fèi)組提交的位點(diǎn)開始消費(fèi),如果提交位點(diǎn)不存在,使用最早位點(diǎn)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
// 從時(shí)間戳大于等于指定時(shí)間戳(毫秒)的數(shù)據(jù)開始消費(fèi)
.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
// 從最早位點(diǎn)開始消費(fèi)
.setStartingOffsets(OffsetsInitializer.earliest())
// 從最末尾位點(diǎn)開始消費(fèi)
.setStartingOffsets(OffsetsInitializer.latest());
有界 / 無界模式?
Kafka Source 支持流式和批式兩種運(yùn)行模式。默認(rèn)情況下,KafkaSource 設(shè)置為以流模式運(yùn)行,因此作業(yè)永遠(yuǎn)不會(huì)停止,直到 Flink 作業(yè)失敗或被取消。 可以使用?setBounded(OffsetsInitializer)
?指定停止偏移量使 Kafka Source 以批處理模式運(yùn)行。當(dāng)所有分區(qū)都達(dá)到其停止偏移量時(shí),Kafka Source 會(huì)退出運(yùn)行。
流模式下運(yùn)行通過使用?setUnbounded(OffsetsInitializer)
?也可以指定停止消費(fèi)位點(diǎn),當(dāng)所有分區(qū)達(dá)到其指定的停止偏移量時(shí),Kafka Source 會(huì)退出運(yùn)行。
估計(jì)99的人都用不到這個(gè) 。也就是設(shè)置一個(gè)結(jié)束的offset的點(diǎn)
其他屬性
除了上述屬性之外,您還可以使用 setProperties(Properties) 和 setProperty(String, String) 為 Kafka Source 和 Kafka Consumer 設(shè)置任意屬性。KafkaSource 有以下配置項(xiàng):
-
client.id.prefix
,指定用于 Kafka Consumer 的客戶端 ID 前綴 -
partition.discovery.interval.ms
,定義 Kafka Source 檢查新分區(qū)的時(shí)間間隔。 請(qǐng)參閱下面的動(dòng)態(tài)分區(qū)檢查一節(jié) -
register.consumer.metrics
?指定是否在 Flink 中注冊 Kafka Consumer 的指標(biāo) -
commit.offsets.on.checkpoint
?指定是否在進(jìn)行 checkpoint 時(shí)將消費(fèi)位點(diǎn)提交至 Kafka broker? 這個(gè)還是有用的
Kafka consumer 的配置可以參考?Apache Kafka 文檔。
請(qǐng)注意,即使指定了以下配置項(xiàng),構(gòu)建器也會(huì)將其覆蓋:
-
key.deserializer
?始終設(shè)置為 ByteArrayDeserializer -
value.deserializer
?始終設(shè)置為 ByteArrayDeserializer -
auto.offset.reset.strategy
?被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆蓋 -
partition.discovery.interval.ms
?會(huì)在批模式下被覆蓋為 -1
動(dòng)態(tài)分區(qū)檢查
為了在不重啟 Flink 作業(yè)的情況下處理 Topic 擴(kuò)容或新建 Topic 等場景,可以將 Kafka Source 配置為在提供的 Topic / Partition 訂閱模式下定期檢查新分區(qū)。要啟用動(dòng)態(tài)分區(qū)檢查,請(qǐng)將?partition.discovery.interval.ms
?設(shè)置為非負(fù)值:
KafkaSource.builder()
.setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒檢查一次新分區(qū)
分區(qū)檢查功能默認(rèn) 不開啟。需要顯式地設(shè)置分區(qū)檢查間隔才能啟用此功能。
這個(gè)是為了擴(kuò)容的,因?yàn)閗afka的消費(fèi)能力和分區(qū)有關(guān),消費(fèi)能力不夠的時(shí)候需要?jiǎng)討B(tài)增加分區(qū)?
事件時(shí)間和水印
默認(rèn)情況下,Kafka Source 使用 Kafka 消息中的時(shí)間戳作為事件時(shí)間。您可以定義自己的水印策略(Watermark Strategy) 以從消息中提取事件時(shí)間,并向下游發(fā)送水?。?/p>
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
消費(fèi)位點(diǎn)提交?
Kafka source 在 checkpoint?完成時(shí)提交當(dāng)前的消費(fèi)位點(diǎn) ,以保證 Flink 的 checkpoint 狀態(tài)和 Kafka broker 上的提交位點(diǎn)一致。如果未開啟 checkpoint,Kafka source 依賴于 Kafka consumer 內(nèi)部的位點(diǎn)定時(shí)自動(dòng)提交邏輯,自動(dòng)提交功能由?enable.auto.commit
?和?auto.commit.interval.ms
?兩個(gè) Kafka consumer 配置項(xiàng)進(jìn)行配置。
注意:Kafka source?不依賴于 broker 上提交的位點(diǎn)來恢復(fù)失敗的作業(yè)。提交位點(diǎn)只是為了上報(bào) Kafka consumer 和消費(fèi)組的消費(fèi)進(jìn)度,以在 broker 端進(jìn)行監(jiān)控。
這里是說明消費(fèi)的offset是由kafka管理還是flink管理。
舉個(gè)例子
.setProperty("enable.auto.commit","true") .setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100")
并且env沒有?env.enableCheckpoint()
此時(shí)只要消息進(jìn)入到flink,過了100ms就會(huì)被認(rèn)為已經(jīng)消費(fèi)過了offset會(huì)+1 不管你這個(gè)消息是否處理完,是否處理失敗了。
但是如果你env.enableCheckpoint(),那么此時(shí)就是由checkpoint被提交的之前提交offset了。和checkpoint息息相關(guān)。checkpoint如果失敗了 那么這個(gè)offset就不會(huì)被提交了。
kafka安全認(rèn)證
KafkaSource.builder()
.setProperty("security.protocol", "SASL_PLAINTEXT")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
上面這個(gè)比較常用?
另一個(gè)更復(fù)雜的例子,使用 SASL_SSL 作為安全協(xié)議并使用 SCRAM-SHA-256 作為 SASL 機(jī)制:
KafkaSource.builder()
.setProperty("security.protocol", "SASL_SSL")
// SSL 配置
// 配置服務(wù)端提供的 truststore (CA 證書) 的路徑
.setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
.setProperty("ssl.truststore.password", "test1234")
// 如果要求客戶端認(rèn)證,則需要配置 keystore (私鑰) 的路徑
.setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
.setProperty("ssl.keystore.password", "test1234")
// SASL 配置
// 將 SASL 機(jī)制配置為 as SCRAM-SHA-256
.setProperty("sasl.mechanism", "SCRAM-SHA-256")
// 配置 JAAS
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
如果在作業(yè) JAR 中 Kafka 客戶端依賴的類路徑被重置了(relocate class),登錄模塊(login module)的類路徑可能會(huì)不同,因此請(qǐng)根據(jù)登錄模塊在 JAR 中實(shí)際的類路徑來改寫以上配置。
Kafka Sink
KafkaSink
?可將數(shù)據(jù)流寫入一個(gè)或多個(gè) Kafka topic。
使用方法
DataStream<String> stream = ...;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
以下屬性在構(gòu)建 KafkaSink 時(shí)是必須指定的:
- Bootstrap servers,?
setBootstrapServers(String)
- 消息序列化器(Serializer),?
setRecordSerializer(KafkaRecordSerializationSchema)
- 如果使用
DeliveryGuarantee.EXACTLY_ONCE
?的語義保證,則需要使用?setTransactionalIdPrefix(String)
?序列化器
KafkaRecordSerializationSchema.builder()
.setTopicSelector((element) -> {<your-topic-selection-logic>})
.setValueSerializationSchema(new SimpleStringSchema())
.setKeySerializationSchema(new SimpleStringSchema())
.setPartitioner(new FlinkFixedPartitioner())
.build();
其中消息體(value)序列化方法和 topic 的選擇方法是必須指定的,此外也可以通過?setKafkaKeySerializer(Serializer)
?或?setKafkaValueSerializer(Serializer)
?來使用 Kafka 提供而非 Flink 提供的序列化器。
容錯(cuò)
KafkaSink
?總共支持三種不同的語義保證(DeliveryGuarantee
)。對(duì)于?DeliveryGuarantee.AT_LEAST_ONCE
?和?DeliveryGuarantee.EXACTLY_ONCE
,F(xiàn)link checkpoint 必須啟用。默認(rèn)情況下?KafkaSink
?使用?DeliveryGuarantee.NONE
。 以下是對(duì)不同語義保證的解釋:
一旦啟用了基于 Kerberos 的 Flink 安全性后,只需在提供的屬性配置中包含以下兩個(gè)設(shè)置(通過傳遞給內(nèi)部 Kafka 客戶端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a進(jìn)行身份驗(yàn)證:
-
DeliveryGuarantee.NONE
?不提供任何保證:消息有可能會(huì)因 Kafka broker 的原因發(fā)生丟失或因 Flink 的故障發(fā)生重復(fù)。 -
DeliveryGuarantee.AT_LEAST_ONCE
: sink 在 checkpoint 時(shí)會(huì)等待 Kafka 緩沖區(qū)中的數(shù)據(jù)全部被 Kafka producer 確認(rèn)。消息不會(huì)因 Kafka broker 端發(fā)生的事件而丟失,但可能會(huì)在 Flink 重啟時(shí)重復(fù),因?yàn)?Flink 會(huì)重新處理舊數(shù)據(jù)。 -
DeliveryGuarantee.EXACTLY_ONCE
: 該模式下,Kafka sink 會(huì)將所有數(shù)據(jù)通過在 checkpoint 時(shí)提交的事務(wù)寫入。因此,如果 consumer 只讀取已提交的數(shù)據(jù)(參見 Kafka consumer 配置?isolation.level
),在 Flink 發(fā)生重啟時(shí)不會(huì)發(fā)生數(shù)據(jù)重復(fù)。然而這會(huì)使數(shù)據(jù)在 checkpoint 完成時(shí)才會(huì)可見,因此請(qǐng)按需調(diào)整 checkpoint 的間隔。請(qǐng)確認(rèn)事務(wù) ID 的前綴(transactionIdPrefix)對(duì)不同的應(yīng)用是唯一的,以保證不同作業(yè)的事務(wù) 不會(huì)互相影響!此外,強(qiáng)烈建議將 Kafka 的事務(wù)超時(shí)時(shí)間調(diào)整至遠(yuǎn)大于 checkpoint 最大間隔 + 最大重啟時(shí)間,否則 Kafka 對(duì)未提交事務(wù)的過期處理會(huì)導(dǎo)致數(shù)據(jù)丟失。 -
啟用 Kerberos 身份驗(yàn)證
-
Flink 通過 Kafka 連接器提供了一流的支持,可以對(duì) Kerberos 配置的 Kafka 安裝進(jìn)行身份驗(yàn)證。只需在?
flink-conf.yaml
?中配置 Flink。像這樣為 Kafka 啟用 Kerberos 身份驗(yàn)證:
1.通過設(shè)置以下內(nèi)容配置 Kerberos 票據(jù) -
security.kerberos.login.use-ticket-cache
:默認(rèn)情況下,這個(gè)值是?true
,F(xiàn)link 將嘗試在?kinit
?管理的票據(jù)緩存中使用 Kerberos 票據(jù)。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 連接器時(shí),使用票據(jù)緩存的 Kerberos 授權(quán)將不起作用。 -
security.kerberos.login.keytab
?和?security.kerberos.login.principal
:要使用 Kerberos keytabs,需為這兩個(gè)屬性設(shè)置值。
2。將?KafkaClient
?追加到?security.kerberos.login.contexts
:這告訴 Flink 將配置的 Kerberos 票據(jù)提供給 Kafka 登錄上下文以用于 Kafka 身份驗(yàn)證。 - 將?
security.protocol
?設(shè)置為?SASL_PLAINTEXT
(默認(rèn)為?NONE
):用于與 Kafka broker 進(jìn)行通信的協(xié)議。使用獨(dú)立 Flink 部署時(shí),也可以使用?SASL_SSL
;請(qǐng)?jiān)诖颂幉榭慈绾螢?SSL 配置 Kafka 客戶端。 - 將?
sasl.kerberos.service.name
?設(shè)置為?kafka
(默認(rèn)為?kafka
):此值應(yīng)與用于 Kafka broker 配置的?sasl.kerberos.service.name
?相匹配??蛻舳撕头?wù)器配置之間的服務(wù)名稱不匹配將導(dǎo)致身份驗(yàn)證失敗。
問題排查
數(shù)據(jù)丟失?
根據(jù)你的 Kafka 配置,即使在 Kafka 確認(rèn)寫入后,你仍然可能會(huì)遇到數(shù)據(jù)丟失。特別要記住在 Kafka 的配置中設(shè)置以下屬性:
acks
log.flush.interval.messages
log.flush.interval.ms
log.flush.*
UnknownTopicOrPartitionException?
導(dǎo)致此錯(cuò)誤的一個(gè)可能原因是正在進(jìn)行新的 leader 選舉,例如在重新啟動(dòng) Kafka broker 之后或期間。這是一個(gè)可重試的異常,因此 Flink job 應(yīng)該能夠重啟并恢復(fù)正常運(yùn)行。也可以通過更改 producer 設(shè)置中的?retries
?屬性來規(guī)避。但是,這可能會(huì)導(dǎo)致重新排序消息,反過來可以通過將?max.in.flight.requests.per.connection
?設(shè)置為 1 來避免不需要的消息。文章來源:http://www.zghlxwxcb.cn/news/detail-605321.html
ProducerFencedException?
這個(gè)錯(cuò)誤是由于?FlinkKafkaProducer
?所生成的?transactional.id
?與其他應(yīng)用所使用的的產(chǎn)生了沖突。多數(shù)情況下,由于?FlinkKafkaProducer
?產(chǎn)生的 ID 都是以?taskName + "-" + operatorUid
?為前綴的,這些產(chǎn)生沖突的應(yīng)用也是使用了相同 Job Graph 的 Flink Job。 我們可以使用?setTransactionalIdPrefix()
?方法來覆蓋默認(rèn)的行為,為每個(gè)不同的 Job 分配不同的?transactional.id
?前綴來解決這個(gè)問題。文章來源地址http://www.zghlxwxcb.cn/news/detail-605321.html
到了這里,關(guān)于flink1.16使用消費(fèi)/生產(chǎn)kafka之DataStream的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!