国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink1.14新版KafkaSource和KafkaSink實踐使用(自定義反序列化器、Topic選擇器、序列化器、分區(qū)器)

這篇具有很好參考價值的文章主要介紹了Flink1.14新版KafkaSource和KafkaSink實踐使用(自定義反序列化器、Topic選擇器、序列化器、分區(qū)器)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

在官方文檔的描述中,API FlinkKafkaConsumer和FlinkKafkaProducer將在后續(xù)版本陸續(xù)棄用、移除,所以在未來生產(chǎn)中有版本升級的情況下,新API KafkaSource和KafkaSink還是有必要學(xué)會使用的。下面介紹下基于新API的一些自定義類以及主程序的簡單實踐。

官方案例

kafkasink,Flink,kafka,flink,scala,大數(shù)據(jù),Powered by 金山文檔
kafkasink,Flink,kafka,flink,scala,大數(shù)據(jù),Powered by 金山文檔
kafkasink,Flink,kafka,flink,scala,大數(shù)據(jù),Powered by 金山文檔
kafkasink,Flink,kafka,flink,scala,大數(shù)據(jù),Powered by 金山文檔
官方文檔地址:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/kafka/

KafkaSource的自定義類

自定義反序列化器

自定義反序列化器可以以指定的格式取到來源Kafka消息中我們想要的元素。該類需要繼承 KafkaDeserializationSchema ,這里簡單將來源Kafka的topic、key、value以Tuple3[String, String, String]的格式取出來。

MyKafkaDeserializationSchemaTuple3.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

import java.nio.charset.StandardCharsets

/**
 * @author hushhhh
 */
class MyKafkaDeserializationSchemaTuple3 extends KafkaDeserializationSchema[(String, String, String)] {
  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, String) = {
    new Tuple3[String, String, String](
      record.topic(),
      new String(record.key(), StandardCharsets.UTF_8),
      new String(record.value(), StandardCharsets.UTF_8))
  }

  override def isEndOfStream(nextElement: (String, String, String)): Boolean = false

  override def getProducedType: TypeInformation[(String, String, String)] = {
    TypeInformation.of(classOf[(String, String, String)])
  }
}

KafkaSink的自定義類

自定義Topic選擇器

自定義一個 TopicSelector 可以將流中多個topic里的數(shù)據(jù)根據(jù)一定邏輯分發(fā)到不同的目標(biāo)topic里。該類需要繼承 TopicSelector ,這里簡單根據(jù)來源Kafka的topic名拼接下。

MyTopicSelector.scala

import org.apache.flink.connector.kafka.sink.TopicSelector

/**
 * @author hushhhh
 */
class MyTopicSelector extends TopicSelector[(String, String, String)] {
  override def apply(t: (String, String, String)): String = {
? ? // t: 來源kafka的topic、key、value
    "TOPIC_" + t._1.toUpperCase()
  }
}

自定義序列化器

自定義序列化器可以將數(shù)據(jù)根據(jù)自己的業(yè)務(wù)格式寫到目標(biāo)Kafka的key和value里,這里將來源Kafka里的key和value直接寫出去,這兩個類都需要繼承 SerializationSchema 。

ProducerRecord Key的序列化器

MyKeySerializationSchema.scala

import org.apache.flink.api.common.serialization.SerializationSchema

/**
 * @author hushhhh
 */
class MyKeySerializationSchema extends SerializationSchema[(String, String, String)] {
  override def serialize(element: (String, String, String)): Array[Byte] = {
? ? // element: 來源kafka的topic、key、value
    element._2.getBytes()
  }
}

ProducerRecord Value的序列化器

MyValueSerializationSchema.scala

import org.apache.flink.api.common.serialization.SerializationSchema

/**
 * @author hushhhh
 */
class MyValueSerializationSchema extends SerializationSchema[(String, String, String)] {
  override def serialize(element: (String, String, String)): Array[Byte] = {
? ? // element: 來源kafka的topic、key、value
    element._3.getBytes()
  }
}

自定義分區(qū)器

自定義分區(qū)器可以根據(jù)具體邏輯對要寫到目標(biāo)Kafka 里的數(shù)據(jù)進(jìn)行partition分配。該類需要繼承 FlinkKafkaPartitioner ,這里根據(jù)key的hash分配到不同的partition里(如果目標(biāo)topic有多個partition的話)。

MyPartitioner.scala

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

/**
 * @author hushhhh
 */
class MyPartitioner extends FlinkKafkaPartitioner[(String, String, String)] {
  override def partition(record: (String, String, String), key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {
? ? // record: 來源kafka的topic、key、value
    Math.abs(new String(record._2).hashCode % partitions.length)
  }
}

主類

Main.scala

import format.{MyKafkaDeserializationSchemaTuple3, MyKeySerializationSchema, MyPartitioner, MyTopicSelector, MyValueSerializationSchema}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.kafka.clients.consumer.OffsetResetStrategy

import java.util.Properties
import scala.collection.JavaConverters._

/**
 * @author hushhhh
 */
object Main {
  def main(args: Array[String]): Unit = {
    /**
     * env
     */
    // stream環(huán)境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
     * source
     */
    // 定義 KafkaSource
    lazy val kafkaSource: KafkaSource[(String, String, String)] = KafkaSource.builder()
      // Kafka消費者的各種配置文件,此處省略配置
      .setProperties(new Properties())
      // 配置消費的一個或多個topic
      .setTopics("sourceTopic1,sourceTopic2,...".split(",", -1).toList.asJava)
      // 開始消費位置,從已提交的offset開始消費,沒有的話從最新的消息開始消費
      .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
      // 反序列化,使用之前我們自定義的反序列化器
      .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchemaTuple3))
      .build()
    // 添加 kafka source
    val inputDS: DataStream[(String, String, String)] = env.fromSource(
      kafkaSource,
      WatermarkStrategy.noWatermarks(),
      "MyKafkaSource")
      .setParallelism(1)

    /**
     * transformation
     */
    // 數(shù)據(jù)加工處理,此處省略

    /**
     * sink
     */
? ? // 定義 KafkaSink
    lazy val kafkaSink: KafkaSink[(String, String, String)] =
      KafkaSink.builder[(String, String, String)]()
        // 目標(biāo)集群地址
        .setBootstrapServers("bootstrap.servers")
        // Kafka生產(chǎn)者的各種配置文件,此處省略配置
        .setKafkaProducerConfig(new Properties())
        // 定義消息的序列化模式
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
          // Topic選擇器,使用之前我們自定義的Topic選擇器
          .setTopicSelector(new MyTopicSelector)
          // Key的序列化器,使用之前我們自定義的Key序列化器
          .setKeySerializationSchema(new MyKeySerializationSchema)
          // Value的序列化器,使用之前我們自定義的Value序列化器
          .setValueSerializationSchema(new MyValueSerializationSchema)
          // 自定義分區(qū)器,使用之前我們自定義的自定義分區(qū)器
          .setPartitioner(new MyPartitioner)
          .build()
        )
        // 語義保證,保證至少一次
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build()

    // 添加 kafka sink
    inputDS.sinkTo(kafkaSink)
      .name("MyKafkaSink")
      .setParallelism(1)

    /**
     * execute
     */
    env.execute("myJob")
  }

}

以上就是KafkaSource和KafkaSink API的簡單使用。大佬們感覺有用的話點個贊吧~??文章來源地址http://www.zghlxwxcb.cn/news/detail-812725.html

到了這里,關(guān)于Flink1.14新版KafkaSource和KafkaSink實踐使用(自定義反序列化器、Topic選擇器、序列化器、分區(qū)器)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • flink1.17 自定義trigger ContinuousEventTimeTrigger

    在?ContinuousEventTimeTrigger 的基礎(chǔ)上新增了timeout,如果超時后窗口都沒關(guān)閉,那么就硬輸出一波,避免間斷數(shù)據(jù),留存窗口太久. ContinuousEventTimeTrigger連續(xù)事件時間觸發(fā)器與ContinuousProcessingTimeTrigger連續(xù)處理時間觸發(fā)器,指定一個固定時間間隔interval,不需要等到窗口結(jié)束才能獲取結(jié)果

    2024年02月14日
    瀏覽(41)
  • Flink1.14提交任務(wù)報錯classloader.check-leaked-classloader問題解決

    我的hadoop版本是3.1.3,F(xiàn)link版本是1.14。不知道是hadoop版本的原因還是Flink版本更新的原因。當(dāng)我運行一個簡單的Flink測試時,雖然結(jié)果出來了但是后面還跟著一段報錯信息。 測試命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 報錯信息: Trying to acce

    2024年02月11日
    瀏覽(26)
  • Flink使用 KafkaSource消費 Kafka中的數(shù)據(jù)

    目前,很多 flink相關(guān)的書籍和網(wǎng)上的文章講解如何對接 kafka時都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標(biāo)記為 deprecated(不推薦),如下: 新版本的 flink應(yīng)該使用 KafkaSource來消費 kafka中的數(shù)據(jù),詳細(xì)代碼如下: 開發(fā)者在工作中應(yīng)該盡量避

    2024年02月15日
    瀏覽(21)
  • 實時數(shù)倉|基于Flink1.11的SQL構(gòu)建實時數(shù)倉探索實踐

    實時數(shù)倉主要是為了解決傳統(tǒng)數(shù)倉數(shù)據(jù)時效性低的問題,實時數(shù)倉通常會用在實時的 OLAP 分析、實時的數(shù)據(jù)看板、業(yè)務(wù)指標(biāo)實時監(jiān)控等場景。雖然關(guān)于實時數(shù)倉的架構(gòu)及技術(shù)選型與傳統(tǒng)的離線數(shù)倉會存在差異,但是關(guān)于數(shù)倉建設(shè)的基本方法論是一致的。本文會分享基于 Flink

    2024年02月16日
    瀏覽(22)
  • 14-部署Kafkasource和KafkaChannel

    14-部署Kafkasource和KafkaChannel

    部署KafkaSource KafkaSource負(fù)責(zé)將Kafka中的消息記錄轉(zhuǎn)為CloudEvents 僅在需要從Kafka中加載消息并提供給Knative Eventing上的應(yīng)用程序使用時才需要KafkaSource 命令: 部署KafkaChannel 負(fù)責(zé)在Knative Eventing上提供基于Kafka集群的Channel實現(xiàn),后端基于Kafka Topic https://knative.dev/docs/install/yaml-install/e

    2024年01月19日
    瀏覽(18)
  • Flink KafkaSink分區(qū)配置的不同版本對比

    Flink KafkaSink分區(qū)配置的不同版本對比

    Flink KafkaSink分區(qū)配置的不同版本對比 在不同版本的Flink中,KafkaSink 分區(qū)默認(rèn)配置方式可能會有一些變化。以下是摘自Flink官方文檔不同版本的原文: 1. Flink版本:1.12~1.19 Sink 分區(qū) # 配置項 sink.partitioner 指定了從 Flink 分區(qū)到 Kafka 分區(qū)的映射關(guān)系。 默認(rèn)情況下,F(xiàn)link 使用 Kafka

    2024年04月24日
    瀏覽(21)
  • [flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer

    [flink1.14.4]Unable to create a source for reading table ‘default_catalog.default_database.new_buyer

    升級flink1.14.4報錯? Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table \\\'default_catalog.default_database.new_buyer_trade_order2\\\'? ? ?source表未加主鍵導(dǎo)致,注釋放開,提交成功

    2024年02月15日
    瀏覽(19)
  • flink1.16使用消費/生產(chǎn)kafka之DataStream

    flink高級版本后,消費kafka數(shù)據(jù)一種是Datastream 一種之tableApi。 上官網(wǎng)?Kafka | Apache Flink 引入依賴 flink和kafka的連接器,里面內(nèi)置了kafka-client 使用方法 很簡單一目了然。 topic和partition ?反序列化 其實就是實現(xiàn)接口?DeserializationSchema 的deserialize()方法 把byte轉(zhuǎn)為你想要的類型。 起

    2024年02月16日
    瀏覽(20)
  • 使用Flink1.16.0的SQLGateway遷移Hive SQL任務(wù)

    使用Flink1.16.0的SQLGateway遷移Hive SQL任務(wù)

    使用Flink的SQL Gateway遷移Hive SQL任務(wù) 我們有數(shù)萬個離線任務(wù),主要還是默認(rèn)的DataPhin調(diào)度CDP集群的Hive On Tez這種低成本任務(wù),當(dāng)然也有PySpark、打Jar包的Spark和打Jar包的Flink任務(wù)這種高成本的任務(wù)【Java和Scala都有】。畢竟SQL上手門檻極低,是個人都能寫幾下并且跑起來,還可以很容

    2023年04月08日
    瀏覽(22)
  • 【FLink消費Kafka之FlinkConsumer到KafkaSource的轉(zhuǎn)變】

    上篇介紹了flink的入門程序wordcount,在項目開發(fā)過程中,最常接觸的還是跟各種源頭系統(tǒng)打交道,其中消費接收kafka中的數(shù)據(jù)是最常見的情況,而flink在1.15版本后連接kafka的依賴包發(fā)生了變化,之前的flink版本使用的依賴包是flink-connector-kafka_2.1x(后面的數(shù)字代表kafka環(huán)境的sca

    2024年01月15日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包