国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

6.2、Flink數(shù)據(jù)寫(xiě)入到Kafka

這篇具有很好參考價(jià)值的文章主要介紹了6.2、Flink數(shù)據(jù)寫(xiě)入到Kafka。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

目錄

1、添加POM依賴

2、API使用說(shuō)明

3、序列化器

3.1 使用預(yù)定義的序列化器

3.2 使用自定義的序列化器

4、容錯(cuò)保證級(jí)別

4.1?至少一次 的配置

4.2?精確一次 的配置

5、這是一個(gè)完整的入門(mén)案例


1、添加POM依賴

Apache Flink 集成了通用的 Kafka 連接器,使用時(shí)需要根據(jù)生產(chǎn)環(huán)境的版本引入相應(yīng)的依賴

<!-- 引入 kafka連接器依賴-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.1</version>
</dependency>

2、API使用說(shuō)明

KafkaSink?可將數(shù)據(jù)流寫(xiě)入一個(gè)或多個(gè) Kafka topic。

官網(wǎng)鏈接:官網(wǎng)鏈接

6.2、Flink數(shù)據(jù)寫(xiě)入到Kafka,# Flink API 使用技巧,flink,kafka,linq

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()  // 泛型為 輸入輸入的類(lèi)型
        // TODO 必填項(xiàng):配置 kafka 的地址和端口
        .setBootstrapServers(brokers)
        // TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        // TODO 必填項(xiàng):配置容錯(cuò)保證級(jí)別 精準(zhǔn)一次、至少一次、不做任何保證
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);

3、序列化器

序列化器的作用是將flink數(shù)據(jù)轉(zhuǎn)換成 kafka的ProducerRecord

6.2、Flink數(shù)據(jù)寫(xiě)入到Kafka,# Flink API 使用技巧,flink,kafka,linq

3.1 使用預(yù)定義的序列化器

功能:將?DataStream 數(shù)據(jù)轉(zhuǎn)換為 Kafka消息中的value,key為默認(rèn)值null,timestamp為默認(rèn)值

// 初始化 KafkaSink 實(shí)例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填項(xiàng):配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
        .setRecordSerializer(
                KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("20230912")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
        )
        .build();

3.2 使用自定義的序列化器

功能:可以對(duì) kafka消息的key、value、partition、timestamp進(jìn)行賦值

/**
 * 如果要指定寫(xiě)入kafka的key,可以自定義序列化器:
 * 		1、實(shí)現(xiàn) 一個(gè)接口,重寫(xiě) 序列化 方法
 * 		2、指定key,轉(zhuǎn)成 字節(jié)數(shù)組
 * 		3、指定value,轉(zhuǎn)成 字節(jié)數(shù)組
 * 		4、返回一個(gè) ProducerRecord對(duì)象,把key、value放進(jìn)去
 */
// 初始化 KafkaSink 實(shí)例 (自定義 KafkaRecordSerializationSchema 實(shí)例)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填項(xiàng):配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
        .setRecordSerializer(
                new KafkaRecordSerializationSchema<String>() {

                    @Nullable
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                        String[] datas = element.split(",");
                        byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                        byte[] value = element.getBytes(StandardCharsets.UTF_8);
                        Long currTimestamp = System.currentTimeMillis();
                        Integer partition = 0;
                        return new ProducerRecord<>("20230913", partition, currTimestamp, key, value);
                    }
                }
        )
        .build();

4、容錯(cuò)保證級(jí)別

KafkaSink?總共支持三種不同的語(yǔ)義保證(DeliveryGuarantee

  • DeliveryGuarantee.NONE? ?不提供任何保證
    • 消息有可能會(huì)因 Kafka broker 的原因發(fā)生丟失或因 Flink 的故障發(fā)生重復(fù)
  • DeliveryGuarantee.AT_LEAST_ONCE??至少一次
    • sink 在 checkpoint 時(shí)會(huì)等待 Kafka 緩沖區(qū)中的數(shù)據(jù)全部被 Kafka producer 確認(rèn)。
    • 消息不會(huì)因 Kafka broker 端發(fā)生的事件而丟失,但可能會(huì)在 Flink 重啟時(shí)重復(fù),因?yàn)?Flink 會(huì)重新處理舊數(shù)據(jù)。
  • DeliveryGuarantee.EXACTLY_ONCE 精確一次
    • 該模式下,Kafka sink 會(huì)將所有數(shù)據(jù)通過(guò)在 checkpoint 時(shí)提交的事務(wù)寫(xiě)入。
    • 因此,如果 consumer 只讀取已提交的數(shù)據(jù)(參見(jiàn) Kafka consumer 配置?isolation.level),在 Flink 發(fā)生重啟時(shí)不會(huì)發(fā)生數(shù)據(jù)重復(fù)。
    • 然而這會(huì)使數(shù)據(jù)在 checkpoint 完成時(shí)才會(huì)可見(jiàn),因此請(qǐng)按需調(diào)整 checkpoint 的間隔。
    • 請(qǐng)確認(rèn)事務(wù) ID 的前綴(transactionIdPrefix)對(duì)不同的應(yīng)用是唯一的,以保證不同作業(yè)的事務(wù) 不會(huì)互相影響!此外,強(qiáng)烈建議將 Kafka 的事務(wù)超時(shí)時(shí)間調(diào)整至遠(yuǎn)大于 checkpoint 最大間隔 + 最大重啟時(shí)間,否則 Kafka 對(duì)未提交事務(wù)的過(guò)期處理會(huì)導(dǎo)致數(shù)據(jù)丟失。

4.1?至少一次 的配置

DataStream<String> stream = ...;

// 初始化 KafkaSink 實(shí)例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填項(xiàng):配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
        .setRecordSerializer(
                KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("20230912")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
        )
        // TODO 必填項(xiàng):配置容災(zāi)保證級(jí)別設(shè)置為 至少一次
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

stream.sinkTo(sink);

4.2?精確一次 的配置

// 如果是精準(zhǔn)一次,必須開(kāi)啟checkpoint
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()  // 泛型為 輸入輸入的類(lèi)型
        // TODO 必填項(xiàng):配置 kafka 的地址和端口
        .setBootstrapServers(brokers)
        // TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        // TODO 必填項(xiàng):配置容災(zāi)保證級(jí)別設(shè)置為 精準(zhǔn)一次
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        // 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)的前綴
        .setTransactionalIdPrefix("flink-")
        // 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)超時(shí)時(shí)間: 大于checkpoint間隔,小于 max 15分鐘
        .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
        .build();
        
stream.sinkTo(sink);

5、這是一個(gè)完整的入門(mén)案例

需求:Flink實(shí)時(shí)讀取?socket數(shù)據(jù)源,將讀取到的數(shù)據(jù)寫(xiě)入到Kafka (要保證不丟失,不重復(fù))

開(kāi)發(fā)語(yǔ)言:java1.8

flink版本:flink1.17.0文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-709426.html

package com.baidu.datastream.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

// TODO flink 數(shù)據(jù)輸出到kafka
public class SinkKafka {
    public static void main(String[] args) throws Exception {
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 如果是精準(zhǔn)一次,必須開(kāi)啟checkpoint
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        // 2.指定數(shù)據(jù)源
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 3.初始化 KafkaSink 實(shí)例
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // TODO 必填項(xiàng):配置 kafka 的地址和端口
                .setBootstrapServers("worker01:9092")
                // TODO 必填項(xiàng):配置消息序列化器信息 Topic名稱、消息序列化器類(lèi)型
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("20230912")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // TODO 必填項(xiàng):配置容災(zāi)保證級(jí)別設(shè)置為 精準(zhǔn)一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)的前綴
                .setTransactionalIdPrefix("flink-")
                // 如果是精準(zhǔn)一次,必須設(shè)置 事務(wù)超時(shí)時(shí)間: 大于checkpoint間隔,小于 max 15分鐘
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
                .build();

        streamSource.sinkTo(kafkaSink);

        // 3.觸發(fā)程序執(zhí)行
        env.execute();
    }
}

到了這里,關(guān)于6.2、Flink數(shù)據(jù)寫(xiě)入到Kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Flink-Kafka-To-Mongo】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫(xiě)入 Mongo(根據(jù)對(duì)應(yīng)操作類(lèi)型進(jìn)行增、刪、改操作,寫(xiě)入時(shí)對(duì)時(shí)間類(lèi)型字段進(jìn)行單獨(dú)處理)

    需求描述: 1、數(shù)據(jù)從 Kafka 寫(xiě)入 Mongo。 2、相關(guān)配置存放于 Mysql 中,通過(guò) Mysql 進(jìn)行動(dòng)態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Kafka 數(shù)據(jù)為 Json 格式,獲取到的數(shù)據(jù)根據(jù)操作類(lèi)型字段進(jìn)行增刪改操作。 5、讀取時(shí)使用自定義 Source,寫(xiě)

    2024年02月22日
    瀏覽(31)
  • flink:通過(guò)table api把文件中讀取的數(shù)據(jù)寫(xiě)入MySQL

    當(dāng)寫(xiě)入數(shù)據(jù)到外部數(shù)據(jù)庫(kù)時(shí),F(xiàn)link 會(huì)使用 DDL 中定義的主鍵。如果定義了主鍵,則連接器將以 upsert 模式工作,否則連接器將以 append 模式工作 文件info.txt

    2024年03月15日
    瀏覽(18)
  • flink寫(xiě)入到kafka 大坑解析。

    flink寫(xiě)入到kafka 大坑解析。

    1.kafka能不能發(fā)送null消息? ? ?能! 2 flink能不能發(fā)送null消息到kafka? 不能! ? ? 這里就報(bào)了java的最常見(jiàn)錯(cuò)誤 空指針,原因就是flink要把kafka的消息getbytes。所以flink不能發(fā)送null到kafka。 這種問(wèn)題會(huì)造成什么后果? flink直接掛掉。 如果我們采取了失敗重試機(jī)制會(huì)怎樣? 數(shù)據(jù)重

    2024年02月15日
    瀏覽(15)
  • 實(shí)戰(zhàn)Flink Java api消費(fèi)kafka實(shí)時(shí)數(shù)據(jù)落盤(pán)HDFS

    實(shí)戰(zhàn)Flink Java api消費(fèi)kafka實(shí)時(shí)數(shù)據(jù)落盤(pán)HDFS

    在Java api中,使用flink本地模式,消費(fèi)kafka主題,并直接將數(shù)據(jù)存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 為了完成 Flink 從 Kafka 消費(fèi)數(shù)據(jù)并實(shí)時(shí)寫(xiě)入 HDFS 的需求,通常需要啟動(dòng)以下組件: 確保 Zookeeper 在運(yùn)行,因?yàn)?Flink 的 Kafka Consumer 需要依賴 Zookeeper。 確保 Kafka Serve

    2024年01月24日
    瀏覽(29)
  • flink日志實(shí)時(shí)采集寫(xiě)入Kafka/ElasticSearch

    flink日志實(shí)時(shí)采集寫(xiě)入Kafka/ElasticSearch

    由于公司想要基于flink的日志做實(shí)時(shí)預(yù)警功能,故需要實(shí)時(shí)接入,并刷入es進(jìn)行分析。 日志接入必須異步,不能影響服務(wù)性能 kafka集群宕機(jī),依舊能夠提交flink任務(wù)且運(yùn)行任務(wù) kafka集群掛起恢復(fù),可以依舊續(xù)寫(xiě)實(shí)時(shí)運(yùn)行日志 在類(lèi)上加上@Plugin注解,標(biāo)記為自定義appender 在類(lèi)加上

    2024年02月08日
    瀏覽(23)
  • 記一次Flink通過(guò)Kafka寫(xiě)入MySQL的過(guò)程

    記一次Flink通過(guò)Kafka寫(xiě)入MySQL的過(guò)程

    一、前言 總體思路:source --transform --sink ,即從source獲取相應(yīng)的數(shù)據(jù)來(lái)源,然后進(jìn)行數(shù)據(jù)轉(zhuǎn)換,將數(shù)據(jù)從比較亂的格式,轉(zhuǎn)換成我們需要的格式,轉(zhuǎn)換處理后,然后進(jìn)行sink功能,也就是將數(shù)據(jù)寫(xiě)入的相應(yīng)的數(shù)據(jù)庫(kù)DB中或者寫(xiě)入Hive的HDFS文件存儲(chǔ)。 思路: pom部分放到最后面。 二

    2024年01月24日
    瀏覽(28)
  • Flink流批一體計(jì)算(15):PyFlink Tabel API之SQL寫(xiě)入Sink

    目錄 舉個(gè)例子 寫(xiě)入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 2. 將結(jié)果數(shù)據(jù)轉(zhuǎn)換為Pandas DataFrame,并收集到客戶端 3. 將結(jié)果寫(xiě)入到一張 Sink 表中 4. 將結(jié)果寫(xiě)入多張 Sink 表中 舉個(gè)例子 將計(jì)算結(jié)果寫(xiě)入給 sink 表 寫(xiě)入Sink的各種情況 1. 將結(jié)果數(shù)據(jù)收集到客戶端 你可以使用 TableR

    2024年02月11日
    瀏覽(19)
  • 使用Flink處理Kafka中的數(shù)據(jù)

    目錄 ????????使用Flink處理Kafka中的數(shù)據(jù) 前提: ?一,?使用Flink消費(fèi)Kafka中ProduceRecord主題的數(shù)據(jù) 具體代碼為(scala) 執(zhí)行結(jié)果 二, 使用Flink消費(fèi)Kafka中ChangeRecord主題的數(shù)據(jù)? ?????????具體代碼(scala) ????????????????具體執(zhí)行代碼① ? ? ? ????????? 重要邏

    2024年01月23日
    瀏覽(20)
  • Flink使用 KafkaSource消費(fèi) Kafka中的數(shù)據(jù)

    目前,很多 flink相關(guān)的書(shū)籍和網(wǎng)上的文章講解如何對(duì)接 kafka時(shí)都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標(biāo)記為 deprecated(不推薦),如下: 新版本的 flink應(yīng)該使用 KafkaSource來(lái)消費(fèi) kafka中的數(shù)據(jù),詳細(xì)代碼如下: 開(kāi)發(fā)者在工作中應(yīng)該盡量避

    2024年02月15日
    瀏覽(22)
  • 掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    ? ? ? ? 導(dǎo)讀:使用Flink實(shí)時(shí)消費(fèi)Kafka數(shù)據(jù)的案例是探索實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實(shí)用,而且對(duì)于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個(gè)在 有界 數(shù)據(jù)流和 無(wú)界 數(shù)據(jù)流上進(jìn)行有狀態(tài)計(jì)算分布式處理引擎和框架。Flink 設(shè)計(jì)旨

    2024年02月03日
    瀏覽(31)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包