本章重點介紹生產(chǎn)環(huán)境中最常用到的Flink kafka connector
。使用Flink
的同學(xué),一定會很熟悉kafka
,它是一個分布式的、分區(qū)的、多副本的、 支持高吞吐的、發(fā)布訂閱消息系統(tǒng)。生產(chǎn)環(huán)境環(huán)境中也經(jīng)常會跟kafka
進(jìn)行一些數(shù)據(jù)的交換,比如利用kafka consumer
讀取數(shù)據(jù),然后進(jìn)行一系列的處理之后,再將結(jié)果寫出到kafka
中。這里會主要分兩個部分進(jìn)行介紹,一是Flink kafka Consumer
,一個是Flink kafka Producer
Flink 輸入輸出至 Kafka案例
首先看一個例子來串聯(lián)下Flink kafka connector
。代碼邏輯里主要是從 kafka里讀數(shù)據(jù),然后做簡單的處理,再寫回到kafka
中。首先需要引入 flink-kafka
相關(guān)的pom.xml
依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.0</version>
</dependency>
分別從如何構(gòu)造一個Source sinkFunction
。Flink
提供了現(xiàn)成的構(gòu)造FlinkKafkaConsumer
、Producer
的接口,可以直接使用。這里需要注意,因為kafka
有多個版本,多個版本之間的接口協(xié)議會不同。Flink
針對不同版本的kafka
有相應(yīng)的版本的Consumer
和Producer
。例如:針對 08
、09
、10
、11
版本,Flink
對應(yīng)的consumer
分別是FlinkKafkaConsumer 08
、09
、010
、011
,producer
也是。
package com.zzx.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import scala.Tuple2;
import scala.tools.nsc.transform.patmat.Logic;
import java.util.Properties;
/**
* @description: Flink 從kafka 中讀取數(shù)據(jù)并寫入kafka
* @author: zzx
* @createDate: 2020/7/22
* @version: 1.0
*/
public class FlinkKafkaExample {
public static void main(String[] args) throws Exception{
//ParameterTool 從參數(shù)中讀取數(shù)據(jù)
final ParameterTool params = ParameterTool.fromArgs(args);
//設(shè)置執(zhí)行環(huán)境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使參數(shù)在web界面中可用
env.getConfig().setGlobalJobParameters(params);
/** TimeCharacteristic 中包含三種時間類型
* @PublicEvolving
* public enum TimeCharacteristic {
* ? //以operator處理的時間為準(zhǔn),它使用的是機(jī)器的系統(tǒng)時間來作為data stream的時間
* ProcessingTime,
* ? //以數(shù)據(jù)進(jìn)入flink streaming data flow的時間為準(zhǔn)
* IngestionTime,
* ? //以數(shù)據(jù)自帶的時間戳字段為準(zhǔn),應(yīng)用程序需要指定如何從record中抽取時間戳字段
* EventTime
* }
*/
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
/**
* CheckpointingMode: EXACTLY_ONCE(執(zhí)行一次) AT_LEAST_ONCE(至少一次)
*/
env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);
//------------------------------------------source start -----------------------------------
String sourceTopic = "sensor";
String bootstrapServers = "hadoop1:9092";
// kafkaConsumer 需要的配置參數(shù)
Properties props = new Properties();
// 定義kakfa 服務(wù)的地址,不需要將所有broker指定上
props.put("bootstrap.servers", bootstrapServers);
// 制定consumer group
props.put("group.id", "test");
// 是否自動確認(rèn)offset
props.put("enable.auto.commit", "true");
// 自動確認(rèn)offset的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//從kafka讀取數(shù)據(jù),需要實現(xiàn) SourceFunction 他給我們提供了一個
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<String>(sourceTopic, new SimpleStringSchema(), props);
//------------------------------------------source end -----------------------------------------
//------------------------------------------sink start -----------------------------------
String sinkTopic = "topic";
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<String>(sinkTopic, new SimpleStringSchema(), properties);
//------------------------------------------sink end --------------------------------------
//FlinkKafkaConsumer011 繼承自 RichParallelSourceFunction
env.addSource(consumer)
.map(new MapFunction<String, Tuple2<Long,String>>(){
@Override
public Tuple2<Long, String> map(String s) throws Exception {
return new Tuple2<>(1L,s);
}
})
.filter(k -> k != null)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Long, String>>(Time.seconds(5)) {
@Override
public long extractTimestamp(Tuple2<Long, String> element) {
return element._1;
}
})
.map(k ->k.toString())
.addSink(producer);
//執(zhí)行
env.execute("FlinkKafkaExample");
}
}
如下創(chuàng)建代碼中涉及的"sensor" Topic
[root@hadoop1 kafka_2.11-2.2.2]# bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --topic sensor --replication-factor 2 --partitions 4
Flink kafka Consumer
反序列化數(shù)據(jù): 因為kafka
中數(shù)據(jù)都是以二進(jìn)制byte
形式存儲的。讀到Flink
系統(tǒng)中之后,需要將二進(jìn)制數(shù)據(jù)轉(zhuǎn)化為具體的java
、scala
對象。具體需要實現(xiàn)一個schema
類定義如何序列化和反序列數(shù)據(jù)。反序列化時需要實現(xiàn)DeserializationSchema
接
口,并重寫deserialize(byte[] message)
函數(shù),如果是反序列化kafka
中kv
的數(shù)據(jù)時,需要實現(xiàn)KeyedDeserializationSchema
接口,并重寫 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
函數(shù)。
另外Flink
中也提供了一些常用的序列化反序列化的schema
類。例如,SimpleStringSchema
,按字符串方式進(jìn)行序列化、反序列化。TypeInformationSerializationSchema
,它可根據(jù)Flink
的TypeInformation
信息來推斷出需要選擇的schema
。JsonDeserializationSchema
使用 jackson
反序列化 json
格式消息,并返回ObjectNode
,可以使用get(“property”)
方法來訪問相應(yīng)字段。
消費起始位置設(shè)置
如何設(shè)置作業(yè)消費kafka
起始位置的數(shù)據(jù),這一部分Flink
也提供了非常好的封裝。在構(gòu)造好的FlinkKafkaConsumer
類后面調(diào)用如下相應(yīng)函數(shù),設(shè)置合適的起始位置。
【1】setStartFromGroupOffsets
,也是默認(rèn)的策略,從group offset
位置讀取數(shù)據(jù),group offset
指的是kafka broker
端記錄的某個group
的最后一次的消費位置。但是kafka broker
端沒有該group
信息,會根據(jù)kafka
的參數(shù)auto.offset.reset
的設(shè)置來決定從哪個位置開始消費。
○ setStartFromEarliest
,從kafka
最早的位置開始讀取。
○ setStartFromLatest
,從kafka
最新的位置開始讀取。
○ setStartFromTimestamp(long)
,從時間戳大于或等于指定時間戳的位置開始讀取。Kafka
時間戳,是指kafka
為每條消息增加另一個時戳。該時戳可以表示消息在proudcer
端生成時的時間、或進(jìn)入到kafka broker
時的時間。
○ setStartFromSpecificOffsets
,從指定分區(qū)的offset
位置開始讀取,如指定的offsets
中不存某個分區(qū),該分區(qū)從group offset
位置開始讀取。此時需要用戶給定一個具體的分區(qū)、offset
的集合。
一些具體的使用方法可以參考下圖。需要注意的是,因為Flink
框架有容錯機(jī)制,如果作業(yè)故障,如果作業(yè)開啟checkpoint
,會從上一次 checkpoint
狀態(tài)開始恢復(fù)?;蛘咴谕V棺鳂I(yè)的時候主動做savepoint
,啟動作業(yè)時從savepoint
開始恢復(fù)。這兩種情況下恢復(fù)作業(yè)時,作業(yè)消費起始位置是從之前保存的狀態(tài)中恢復(fù),與上面提到跟kafka
這些單獨的配置無關(guān)。
topic 和 partition 動態(tài)發(fā)現(xiàn)
實際的生產(chǎn)環(huán)境中可能有這樣一些需求:
場景一,有一個Flink
作業(yè)需要將五份數(shù)據(jù)聚合到一起,五份數(shù)據(jù)對應(yīng)五個kafka topic
,隨著業(yè)務(wù)增長,新增一類數(shù)據(jù),同時新增了一個 kafka topic
,如何在不重啟作業(yè)的情況下作業(yè)自動感知新的topic
。
場景二,作業(yè)從一個固定的kafka topic
讀數(shù)據(jù),開始該topic
有10
個partition
,但隨著業(yè)務(wù)的增長數(shù)據(jù)量變大,需要對kafka partition
個數(shù)進(jìn)行擴(kuò)容,由10
個擴(kuò)容到20
。該情況下如何在不重啟作業(yè)情況下動態(tài)感知新擴(kuò)容的partition
?
針對上面的兩種場景,首先需要在構(gòu)建FlinkKafkaConsumer
時的properties
中設(shè)置flink.partition-discovery.interval-millis
參數(shù)為非負(fù)值,表示開啟動態(tài)發(fā)現(xiàn)的開關(guān),以及設(shè)置的時間間隔。此時FlinkKafkaConsumer
內(nèi)部會啟動一個單獨的線程定期去kafka
獲取最新的meta
信息。針對場景一,還需在構(gòu)建FlinkKafkaConsumer
時,topic
的描述可以傳一個正則表達(dá)式(如下圖所示)描述的pattern
。每次獲取最新kafka meta
時獲取正則匹配的最新topic
列表。針對場景二,設(shè)置前面的動態(tài)發(fā)現(xiàn)參數(shù),在定期獲取kafka
最新meta
信息時會匹配新的partition
。為了保證數(shù)據(jù)的正確性,新發(fā)現(xiàn)的partition
從最早的位置開始讀取。
commit offset 方式
Flink kafka consumer commit offset
方式需要區(qū)分是否開啟了checkpoint
。如果checkpoint
關(guān)閉,commit offset
要依賴于kafka
客戶端的auto commit
。 需設(shè)置enable.auto.commit
,auto.commit.interval.ms
參數(shù)到consumer properties
,就會按固定的時間間隔定期auto commit offset
到 kafka
。如果開啟checkpoint
,這個時候作業(yè)消費的offset
,Flink
會在state
中自己管理和容錯。此時提交offset
到kafka
,一般都是作為外部進(jìn)度的監(jiān)控,想實時知道作業(yè)消費的位置和lag
情況。此時需要setCommitOffsetsOnCheckpoints
為true
來設(shè)置當(dāng)checkpoint
成功時提交offset
到kafka
。此時commit offset
的間隔就取決于checkpoint
的間隔,所以此時從kafka
一側(cè)看到的lag
可能并非完全實時,如果checkpoint
間隔比較長lag
曲線可能會是一個鋸齒狀。
Timestamp Extraction/Watermark 生成
我們知道當(dāng)Flink
作業(yè)內(nèi)使用EventTime
屬性時,需要指定從消息中提取時間戳和生成水位的函數(shù)。FlinkKakfaConsumer
構(gòu)造的source
后直接調(diào)用assignTimestampsAndWatermarks
函數(shù)設(shè)置水位生成器的好處是此時是每個partition
一個watermark assigner
,如下圖。source
生成的時戳為多個partition
時戳對齊后的最小時戳。此時在一個source
讀取多個partition
,并且partition
之間數(shù)據(jù)時戳有一定差距的情況下,因為在 source
端watermark
在partition
級別有對齊,不會導(dǎo)致數(shù)據(jù)讀取較慢partition
數(shù)據(jù)丟失。
Flink kafka Producer
【1】Producer
分區(qū): 使用FlinkKafkaProducer
往kafka
中寫數(shù)據(jù)時,如果不單獨設(shè)置partition
策略,會默認(rèn)使用FlinkFixedPartitioner
,該 partitioner
分區(qū)的方式是task
所在的并發(fā)id
對topic
總partition
數(shù)取余:parallelInstanceId % partitions.length
。
○ 此時如果sink
為4
,paritition
為1
,則4
個task
往同一個partition
中寫數(shù)據(jù)。但當(dāng)sink task < partition
個數(shù)時會有部分partition
沒有數(shù)據(jù)寫入,例如sink task
為2
,partition
總數(shù)為4
,則后面兩個partition
將沒有數(shù)據(jù)寫入。
○ 如果構(gòu)建FlinkKafkaProducer
時,partition
設(shè)置為null
,此時會使用kafka producer
默認(rèn)分區(qū)方式,非key
寫入的情況下,使用round-robin
的方式進(jìn)行分區(qū),每個task
都會輪循的寫下游的所有partition
。該方式下游的partition
數(shù)據(jù)會比較均衡,但是缺點是partition
個數(shù)過多的情況下需要維持過多的網(wǎng)絡(luò)連接,即每個task
都會維持跟所有partition
所在broker
的連接。
容錯
Flink kafka 09
、010
版本下,通過setLogFailuresOnly
為false
,setFlushOnCheckpoint
為true
, 能達(dá)到at-least-once
語義。setLogFailuresOnly
默認(rèn)為false
,是控制寫kafka
失敗時,是否只打印失敗的log
不拋異常讓作業(yè)停止。setFlushOnCheckpoint
,默認(rèn)為true
,是控制是否在 checkpoint
時fluse
數(shù)據(jù)到kafka
,保證數(shù)據(jù)已經(jīng)寫到kafka
。否則數(shù)據(jù)有可能還緩存在kafka
客戶端的buffer
中,并沒有真正寫出到kafka
,此時作業(yè)掛掉數(shù)據(jù)即丟失,不能做到至少一次的語義。Flink kafka 011
版本下,通過兩階段提交的sink
結(jié)合kafka
事務(wù)的功能,可以保證端到端精準(zhǔn)一次。文章來源:http://www.zghlxwxcb.cn/news/detail-763909.html
疑問與解答
【問題一】: 在Flink consumer
的并行度的設(shè)置:是對應(yīng)topic
的partitions
個數(shù)嗎?要是有多個主題數(shù)據(jù)源,并行度是設(shè)置成總體的 partitions
數(shù)嗎?
【解答】: 這個并不是絕對的,跟topic
的數(shù)據(jù)量也有關(guān),如果數(shù)據(jù)量不大,也可以設(shè)置小于partitions
個數(shù)的并發(fā)數(shù)。但不要設(shè)置并發(fā)數(shù)大于partitions
總數(shù),因為這種情況下某些并發(fā)因為分配不到partition
導(dǎo)致沒有數(shù)據(jù)處理。
【問題二】: 如果partitioner
傳null
的時候是round-robin
發(fā)到每一個partition
?如果有key
的時候行為是kafka
那種按照key
分布到具體分區(qū)的行為嗎?
【解答】: 如果在構(gòu)造FlinkKafkaProducer
時,如果沒有設(shè)置單獨的partitioner
,則默認(rèn)使用FlinkFixedPartitioner
,此時無論是帶key
的數(shù)據(jù),還是不帶key
。如果主動設(shè)置partitioner
為null
時,不帶key
的數(shù)據(jù)會round-robin
輪詢的方式寫出到partition
,帶key
的數(shù)據(jù)會根據(jù)key
,相同key
數(shù)據(jù)分區(qū)的相同的partition
。
【問題三】: 如果checkpoint
時間過長,offset
未提交到kafka
,此時節(jié)點宕機(jī)了,重啟之后的重復(fù)消費如何保證呢?
【解答】: 首先開啟checkpoint
時offset
是Flink
通過狀態(tài)state
管理和恢復(fù)的,并不是從kafka
的offset
位置恢復(fù)。在checkpoint
機(jī)制下,作業(yè)從最近一次checkpoint
恢復(fù),本身是會回放部分歷史數(shù)據(jù),導(dǎo)致部分?jǐn)?shù)據(jù)重復(fù)消費,Flink
引擎僅保證計算狀態(tài)的精準(zhǔn)一次,要想做到端到端精準(zhǔn)一次需要依賴一些冪等的存儲系統(tǒng)或者事務(wù)操作。文章來源地址http://www.zghlxwxcb.cn/news/detail-763909.html
到了這里,關(guān)于Flink Kafka[輸入/輸出] Connector的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!