国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用 Apache Flink 開發(fā)實時 ETL

這篇具有很好參考價值的文章主要介紹了使用 Apache Flink 開發(fā)實時 ETL。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Apache Flink 是大數(shù)據(jù)領(lǐng)域又一新興框架。它與 Spark 的不同之處在于,它是使用流式處理來模擬批量處理的,因此能夠提供亞秒級的、符合 Exactly-once 語義的實時處理能力。Flink 的使用場景之一是構(gòu)建實時的數(shù)據(jù)通道,在不同的存儲之間搬運和轉(zhuǎn)換數(shù)據(jù)。本文將介紹如何使用 Flink 開發(fā)實時 ETL 程序,并介紹 Flink 是如何保證其 Exactly-once 語義的。使用 Apache Flink 開發(fā)實時 ETL

示例程序

讓我們來編寫一個從 Kafka 抽取數(shù)據(jù)到 HDFS 的程序。數(shù)據(jù)源是一組事件日志,其中包含了事件發(fā)生的時間,以時間戳的方式存儲。我們需要將這些日志按事件時間分別存放到不同的目錄中,即按日分桶。時間日志示例如下:

{"timestamp":1545184226.432,"event":"page_view","uuid":"ac0e50bf-944c-4e2f-bbf5-a34b22718e0c"}
{"timestamp":1545184602.640,"event":"adv_click","uuid":"9b220808-2193-44d1-a0e9-09b9743dec55"}
{"timestamp":1545184608.969,"event":"thumbs_up","uuid":"b44c3137-4c91-4f36-96fb-80f56561c914"}

產(chǎn)生的目錄結(jié)構(gòu)為:

/user/flink/event_log/dt=20181219/part-0-1
/user/flink/event_log/dt=20181220/part-1-9

創(chuàng)建 Flink 項目

官方提供了快速生成 Flink 項目的模板,可以直接運行下面命令,這里我使用的是 flink 1.9 版本

 $ mvn archetype:generate                              \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-scala     \
      -DarchetypeVersion=1.9.0

將生成好的代碼導(dǎo)入到 IDE 中,可以看到名為 StreamingJob 的文件,我們由此開始編寫程序。

Kafka 數(shù)據(jù)源

Flink 對 Kafka 數(shù)據(jù)源提供了 原生支持,我們需要選擇正確的 Kafka 依賴版本,將其添加到 POM 文件中:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010 < > (
    "flink_test", new SimpleStringSchema(), props);
DataStream stream = env.addSource(consumer);

Flink 會連接本地的 Kafka 服務(wù),讀取 flink_test 主題中的數(shù)據(jù),轉(zhuǎn)換成字符串后返回。除了 SimpleStringSchema,F(xiàn)link 還提供了其他內(nèi)置的反序列化方式,如 JSON、Avro 等,我們也可以編寫自定義邏輯。

流式文件存儲

StreamingFileSink 替代了先前的 BucketingSink,用來將上游數(shù)據(jù)存儲到 HDFS 的不同目錄中。它的核心邏輯是分桶,默認(rèn)的分桶方式是 DateTimeBucketAssigner,即按照處理時間分桶。處理時間指的是消息到達(dá) Flink 程序的時間,這點并不符合我們的需求。因此,我們需要自己編寫代碼將事件時間從消息體中解析出來,按規(guī)則生成分桶的名稱:

public class EventTimeBucketAssigner implements BucketAssigner < String, String > {@
    Override
    public String getBucketId(String element, Context context) {
        JsonNode node = mapper.readTree(element);
        long date = (long)(node.path("timestamp").floatValue() * 1000);
        String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));
        return "dt=" + partitionValue;
    }
}

上述代碼會使用 Jackson 庫對消息體進(jìn)行解析,將時間戳轉(zhuǎn)換成日期字符串,添加前綴后返回。如此一來,StreamingFileSink 就能知道應(yīng)該將當(dāng)前記錄放置到哪個目錄中了。完整代碼可以參考 GitHub(鏈接)。

StreamingFileSink sink = StreamingFileSink
    .forRowFormat(new Path("/tmp/kafka-loader"), new SimpleStringEncoder())
    .withBucketAssigner(new EventTimeBucketAssigner())
    .build();
stream.addSink(sink);

forRowFormat 表示輸出的文件是按行存儲的,對應(yīng)的有 forBulkFormat,可以將輸出結(jié)果用 Parquet 等格式進(jìn)行壓縮存儲。

關(guān)于 StreamingFileSink 還有一點要注意,它只支持 Hadoop 2.7 以上的版本,因為需要用到高版本文件系統(tǒng)提供的 truncate 方法來實現(xiàn)故障恢復(fù),這點下文會詳述。

開啟檢查點

代碼編寫到這里,其實已經(jīng)可以通過 env.execute() 來運行了。但是,它只能保證 At-least-once 語義,即消息有可能會被重復(fù)處理。要做到 Exactly-once,我們還需要開啟 Flink 的檢查點功能:

env.enableCheckpointing(60 _000);
env.setStateBackend((StateBackend) new FsStateBackend("/tmp/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

檢查點(Checkpoint)是 Flink 的故障恢復(fù)機(jī)制,同樣會在下文詳述。代碼中,我們將狀態(tài)存儲方式由 MemoryStateBackend 修改為了 FsStateBackend,即使用外部文件系統(tǒng),如 HDFS,來保存應(yīng)用程序的中間狀態(tài),這樣當(dāng) Flink JobManager 宕機(jī)時,也可以恢復(fù)過來。Flink 還支持 RocksDBStateBackend,用來存放較大的中間狀態(tài),并能支持增量的狀態(tài)更新。

提交與管理腳本

Flink 程序可以直接在 IDE 中調(diào)試。我們也可以搭建一個本地的 Flink 集群,并通過 Flink CLI 命令行工具來提交腳本:

bin/flink run -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar

腳本的運行狀態(tài)可以在 Flink 儀表盤中查看:

使用暫存點來停止和恢復(fù)腳本

當(dāng)需要暫停腳本、或?qū)Τ绦蜻壿嬤M(jìn)行修改時,我們需要用到 Flink 的暫存點機(jī)制(Savepoint)。暫存點和檢查點類似,同樣保存的是 Flink 各個算子的狀態(tài)數(shù)據(jù)(Operator State)。不同的是,暫存點主要用于人為的腳本更替,而檢查點則主要由 Flink 控制,用來實現(xiàn)故障恢復(fù)。flink cancel -s 命令可以在停止腳本的同時創(chuàng)建一個暫存點:

$ bin/flink cancel -s /tmp/flink/savepoints 1253cc85e5c702dbe963dd7d8d279038
Cancelled job 1253cc85e5c702dbe963dd7d8d279038. Savepoint stored in file:/tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee.

具體到我們的 ETL 示例程序,暫存點中保存了當(dāng)前 Kafka 隊列的消費位置、正在寫入的文件名等。當(dāng)需要從暫存點恢復(fù)執(zhí)行時,可以使用 flink run -s 傳入目錄位置。Flink 會從指定偏移量讀取消息隊列,并處理好中間結(jié)果文件,確保沒有缺失或重復(fù)的數(shù)據(jù)。

flink run -s /tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar

在 YARN 上運行

要將腳本提交到 YARN 集群上運行,同樣是使用 flink run 命令。首先將代碼中指定文件目錄的部分添加上 HDFS 前綴,如 hdfs://localhost:9000/,重新打包后執(zhí)行下列命令:

$ export HADOOP_CONF_DIR=/path/to/hadoop/conf
$ bin/flink run -m yarn-cluster -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar
Submitted application application_1545534487726_0001

Flink 儀表盤會在 YARN Application Master 中運行,我們可以通過 ResourceManager 界面進(jìn)入。返回的應(yīng)用 ID 可以用來管理腳本,添加 -yid 參數(shù)即可:

bin/flink cancel -s hdfs://localhost:9000/tmp/flink/savepoints -yid application_1545534487726_0001 84de00a5e193f26c937f72a9dc97f386

Flink 如何保證 Exactly-once 語義

Flink 實時處理程序可以分為三個部分,數(shù)據(jù)源、處理流程、以及輸出。不同的數(shù)據(jù)源和輸出提供了不同的語義保證,F(xiàn)link 統(tǒng)稱為 連接器。處理流程則能提供 Exactly-once 或 At-least-once 語義,需要看檢查點是否開啟。

實時處理與檢查點

Flink 的檢查點機(jī)制是基于 Chandy-Lamport 算法的:Flink 會定時在數(shù)據(jù)流中安插輕量的標(biāo)記信息(Barrier),將消息流切割成一組組記錄;當(dāng)某個算子處理完一組記錄后,就將當(dāng)前狀態(tài)保存為一個檢查點,提交給 JobManager,該組的標(biāo)記信息也會傳遞給下游;當(dāng)末端的算子(通常是 Sink)處理完這組記錄并提交檢查點后,這個檢查點將被標(biāo)記為“已完成”;當(dāng)腳本出現(xiàn)問題時,就會從最后一個“已完成”的檢查點開始重放記錄。

如果算子有多個上游,F(xiàn)link 會使用一種稱為“消息對齊”的機(jī)制:如果某個上游出現(xiàn)延遲,當(dāng)前算子會停止從其它上游消費消息,直到延遲的上游趕上進(jìn)度,這樣就保證了算子中的狀態(tài)不會包含下一批次的記錄。顯然,這種方式會引入額外的延遲,因此除了這種 EXACTLY_ONCE 模式,我們也可將檢查點配置為 AT_LEAST_ONCE,以獲得更高的吞吐量。具體方式請參考 官方文檔。

可重放的數(shù)據(jù)源

當(dāng)出錯的腳本需要從上一個檢查點恢復(fù)時,F(xiàn)link 必須對數(shù)據(jù)進(jìn)行重放,這就要求數(shù)據(jù)源支持這一功能。Kafka 是目前使用得較多的消息隊列,且支持從特定位點進(jìn)行消費。具體來說,F(xiàn)linkKafkaConsumer 類實現(xiàn)了 CheckpointedFunction 接口,會在檢查點中存放主題名、分區(qū)名、以及偏移量:

abstract class FlinkKafkaConsumerBase implements CheckpointedFunction {
    public void initializeState(FunctionInitializationContext context) {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor < > (
            OFFSETS_STATE_NAME,
            TypeInformation.of(new TypeHint < Tuple2 < KafkaTopicPartition, Long >> () {})));
        if (context.isRestored()) {
            for (Tuple2 < KafkaTopicPartition, Long > kafkaOffset: unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }
        }
    }
    public void snapshotState(FunctionSnapshotContext context) {
        unionOffsetStates.clear();
        for (Map.Entry < KafkaTopicPartition, Long > kafkaTopicPartitionLongEntry: currentOffsets.entrySet()) {
            unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),
                kafkaTopicPartitionLongEntry.getValue()));
        }
    }
}

當(dāng)數(shù)據(jù)源算子從檢查點或暫存點恢復(fù)時,我們可以在 TaskManager 的日志中看到以下信息,表明當(dāng)前消費的偏移量是從算子狀態(tài)中恢復(fù)出來的:

2018-12-23 10:56:47,380 INFO FlinkKafkaConsumerBase
Consumer subtask 0 will start reading 2 partitions with offsets in restored state:
{KafkaTopicPartition{topic='flink_test', partition=1}=725,
KafkaTopicPartition{topic='flink_test', partition=0}=721}

恢復(fù)寫入中的文件

程序運行過程中,StreamingFileSink 首先會將結(jié)果寫入中間文件,以 . 開頭、in-progress 結(jié)尾。這些中間文件會在符合一定條件后更名為正式文件,取決于用戶配置的 RollingPolicy,默認(rèn)策略是基于時間(60 秒)和基于大?。?28 MB)。當(dāng)腳本出錯或重啟時,中間文件會被直接關(guān)閉;在恢復(fù)時,由于檢查點中保存了中間文件名和成功寫入的長度,程序會重新打開這些文件,切割到指定長度(Truncate),然后繼續(xù)寫入。這樣一來,文件中就不會包含檢查點之后的記錄了,從而實現(xiàn) Exactly-once。

以 Hadoop 文件系統(tǒng)舉例,恢復(fù)的過程是在 HadoopRecoverableFsDataOutputStream 類的構(gòu)造函數(shù)中進(jìn)行的。它會接收一個 HadoopFsRecoverable 類型的結(jié)構(gòu),里面包含了中間文件的路徑和長度。這個對象是 BucketState 的成員,會被保存在檢查點中。

HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) {
    this.tempFile = checkNotNull(recoverable.tempFile());
    truncate(fs, tempFile, recoverable.offset());
    out = fs.append(tempFile);
}

結(jié)論

Apache Flink 構(gòu)建在實時處理之上,從設(shè)計之初就充分考慮了中間狀態(tài)的保存,而且能夠很好地與現(xiàn)有 Hadoop 生態(tài)環(huán)境結(jié)合,因而在大數(shù)據(jù)領(lǐng)域非常有競爭力。它還在高速發(fā)展之中,近期也引入了 Table API、流式 SQL、機(jī)器學(xué)習(xí)等功能,像阿里巴巴這樣的公司也在大量使用和貢獻(xiàn)代碼。Flink 的應(yīng)用場景眾多,有很大的發(fā)展?jié)摿?,值得一試?span toymoban-style="hidden">文章來源地址http://www.zghlxwxcb.cn/news/detail-450776.html

到了這里,關(guān)于使用 Apache Flink 開發(fā)實時 ETL的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 數(shù)據(jù)架構(gòu)的實時分析:Apache Flink 和 Apache Storm 的比較

    實時數(shù)據(jù)處理在大數(shù)據(jù)領(lǐng)域具有重要意義,它可以幫助企業(yè)更快地獲取和分析數(shù)據(jù),從而更快地做出決策。隨著數(shù)據(jù)量的增加,傳統(tǒng)的批處理方法已經(jīng)不能滿足企業(yè)的需求,因此需要使用實時數(shù)據(jù)處理技術(shù)。 Apache Flink 和 Apache Storm 是兩個流行的實時數(shù)據(jù)處理框架,它們都可以

    2024年01月23日
    瀏覽(28)
  • 【大數(shù)據(jù)-實時流計算】圖文詳解 Apache Flink 架構(gòu)原理

    目錄 Apache?Flink架構(gòu)介紹 一、Flink組件棧 二、Flink運行時架構(gòu) 在Flink的整個

    2024年02月02日
    瀏覽(22)
  • Kudu與Apache Flink的集成:實時數(shù)據(jù)處理的新方法

    隨著數(shù)據(jù)的增長,實時數(shù)據(jù)處理變得越來越重要。傳統(tǒng)的批處理系統(tǒng)已經(jīng)不能滿足現(xiàn)在的需求。因此,實時數(shù)據(jù)處理技術(shù)逐漸成為了研究的熱點。Kudu和Apache Flink是兩個非常重要的實時數(shù)據(jù)處理系統(tǒng),它們各自具有獨特的優(yōu)勢。Kudu是一個高性能的列式存儲系統(tǒng),適用于實時數(shù)

    2024年02月21日
    瀏覽(23)
  • 【Apache-StreamPark】Flink 開發(fā)利器 StreamPark 的介紹、安裝、使用

    【Apache-StreamPark】Flink 開發(fā)利器 StreamPark 的介紹、安裝、使用

    StreamPark 核心由 streampark-core 和 streampark-console 組成 之前我們寫 Flink SQL 基本上都是使用 Java 包裝 SQL,打 jar 包,提交到服務(wù)器上。通過命令行方式提交代碼,但這種方式始終不友好,流程繁瑣,開發(fā)和運維成本太大。我們希望能夠進(jìn)一步簡化流程,將 Flink TableEnvironment 抽象出

    2024年02月02日
    瀏覽(28)
  • 如何基于 Apache Doris 與 Apache Flink 快速構(gòu)建極速易用的實時數(shù)倉

    如何基于 Apache Doris 與 Apache Flink 快速構(gòu)建極速易用的實時數(shù)倉

    隨著大數(shù)據(jù)應(yīng)用的不斷深入,企業(yè)不再滿足離線數(shù)據(jù)加工計算的時效,實時數(shù)據(jù)需求已成為數(shù)據(jù)應(yīng)用新常態(tài)。伴隨著實時分析需求的不斷膨脹,傳統(tǒng)的數(shù)據(jù)架構(gòu)面臨的成本高、實時性無法保證、組件繁冗、運維難度高等問題日益凸顯。為了適應(yīng)業(yè)務(wù)快速迭代的特點,幫助企業(yè)

    2024年02月12日
    瀏覽(18)
  • Apache Flink X Apache Doris構(gòu)建極速易用的實時數(shù)倉架構(gòu)

    Apache Flink X Apache Doris構(gòu)建極速易用的實時數(shù)倉架構(gòu)

    大家好,我叫王磊。是SelectDB 大數(shù)據(jù)研發(fā)。今天給大家?guī)淼姆窒硎恰禔pache Flink X Apache Doris構(gòu)建極速易用的實時數(shù)倉架構(gòu)》。 下面是我們的個人介紹:我是Apache Doris Contributor 和阿里云 MVP。同時著有《 圖解 Spark 大數(shù)據(jù)快速分析實戰(zhàn)》等書籍。 接下來咱們進(jìn)入本次演講的正題

    2023年04月24日
    瀏覽(22)
  • Flink實時寫入Apache Doris如何保證高吞吐和低延遲

    Flink實時寫入Apache Doris如何保證高吞吐和低延遲

    隨著實時分析需求的不斷增加,數(shù)據(jù)的時效性對于企業(yè)的精細(xì)化運營越來越重要。借助海量數(shù)據(jù),實時數(shù)倉在有效挖掘有價值信息、快速獲取數(shù)據(jù)反饋、幫助企業(yè)更快決策、更好的產(chǎn)品迭代等方面發(fā)揮著不可替代的作用。 在這種情況下,Apache Doris 作為一個實時 MPP 分析數(shù)據(jù)庫脫穎

    2024年01月17日
    瀏覽(33)
  • 流數(shù)據(jù)湖平臺Apache Paimon(三)Flink進(jìn)階使用

    流數(shù)據(jù)湖平臺Apache Paimon(三)Flink進(jìn)階使用

    2.9.1 寫入性能 Paimon的寫入性能與檢查點密切相關(guān),因此需要更大的寫入吞吐量: 增加檢查點間隔,或者僅使用批處理模式。 增加寫入緩沖區(qū)大小。 啟用寫緩沖區(qū)溢出。 如果您使用固定存儲桶模式,請重新調(diào)整存儲桶數(shù)量。 2.9.1.1 并行度 建議sink的并行度小于等于bucket的數(shù)量

    2024年02月09日
    瀏覽(21)
  • 大數(shù)據(jù)_面試_ETL組件常見問題_spark&flink

    問題列表 回答 spark與flink的主要區(qū)別 flink cdc如何確保冪等與一致性 Flink SQL CDC 實踐以及一致性分析-阿里云開發(fā)者社區(qū) spark 3.0 AQE動態(tài)優(yōu)化 hbase memorystore blockcache sparksql如何調(diào)優(yōu) 通過webui定位那個表以及jobid,jobid找對應(yīng)的執(zhí)行計劃 hdfs的常見的壓縮算法 hbase的數(shù)據(jù)傾斜 spark數(shù)據(jù)處

    2024年02月16日
    瀏覽(25)
  • 使用flink實現(xiàn)《實時數(shù)據(jù)分析》的案例 java版

    本文檔介紹了使用Java和Flink實現(xiàn)實時數(shù)據(jù)分析的案例。該案例使用Flink的流處理功能,從Kafka主題中讀取數(shù)據(jù),進(jìn)行實時處理和分析,并將結(jié)果輸出到Elasticsearch中。 Java 8 Flink 1.13.2 Kafka 2.8.0 Elasticsearch 7.13.4 本案例使用Kafka作為數(shù)據(jù)源,從一個名為 user_behavior 的主題中讀取數(shù)據(jù)。

    2024年02月08日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包