国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)之使用Flink消費(fèi)Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)不同的表前綴區(qū)分在存入Kafka的topic當(dāng)中

這篇具有很好參考價(jià)值的文章主要介紹了大數(shù)據(jù)之使用Flink消費(fèi)Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)不同的表前綴區(qū)分在存入Kafka的topic當(dāng)中。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

前言

題目:

一、讀題分析

二、處理過程

? 1.數(shù)據(jù)處理部分:

2.HBaseSink(未經(jīng)測試,不能證明其正確性,僅供參考?。?/p>

三、重難點(diǎn)分析

總結(jié)?

什么是HBase?


前言

本題來源于全國職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項(xiàng)賽題 - 電商數(shù)據(jù)處理 - 實(shí)時(shí)數(shù)據(jù)處理

注:由于設(shè)備問題,代碼執(zhí)行結(jié)果以及數(shù)據(jù)的展示無法給出,可參照我以往的博客其中有相同數(shù)據(jù)源展示

題目:

????????使用Flink消費(fèi)Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)數(shù)據(jù)中不同的表前綴區(qū)分,將數(shù)據(jù)分別分發(fā)至kafka的DWD層的dim_customer_login_log的Topic中,其他的表則無需處理;


提示:以下是本篇文章正文內(nèi)容,下面案例可供參考(使用Scala語言編寫)?

一、讀題分析

涉及組件:Scala,F(xiàn)link,Kafka,HBase

涉及知識(shí)點(diǎn):

  1. Flink函數(shù)的使用
  2. 了解HBase,基本使用HBase

二、處理過程

? 1.數(shù)據(jù)處理部分:


import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

import java.util.Properties

object answer2 {
  def main(args: Array[String]): Unit = {
    import moduleC.test.HBaseSink2
    import org.apache.flink.streaming.api.scala.DataStream

    //    創(chuàng)建flink流環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //    創(chuàng)建Kafka的配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "bigdata1:9092")

    //    創(chuàng)建Kafka的消費(fèi)者
    val kafkaConsumer = new FlinkKafkaConsumer[String]("ods_mall_log", new SimpleStringSchema(), properties)

    //    讀取消費(fèi)的數(shù)據(jù)
    val kafkaStream = env.addSource(kafkaConsumer)

    val newStream: DataStream[(String, String)] = kafkaStream
      .map(
        line => {
          val tablename = line.split(":")(0)
          val data = line.split(":")(1).stripPrefix("(").stripSuffix(");")
          //        12115|7611|0|0|'20230407111600'
          (tablename, data)
        }
      ).filter(_._1 == "customer_login_log")

    newStream.print()
    //    創(chuàng)建Kafka的生產(chǎn)者
    val dwdKafkaProduce = new FlinkKafkaProducer[String]("dim_customer_login_log", new SimpleStringSchema(), properties)
    //    val dwdKafkaProduce2 = new FlinkKafkaProducer[String]("dwd-topic1", new SimpleStringSchema(), properties)

    //    向Kafka發(fā)送數(shù)據(jù)
    //    newStream.addSink(dwdKafkaProduce)    不調(diào)用map方法會(huì)報(bào)錯(cuò)因?yàn)樾枰粋€(gè)string而不是(string,string)
    val result = newStream.map(line => line._2)
    result.addSink(dwdKafkaProduce)

    //    發(fā)到HBase上
    result.addSink(new HBaseSink2)

    //    execute
    env.execute("flinkKafkaToKafka")
  }
}

2.HBaseSink(未經(jīng)測試,不能證明其正確性,僅供參考?。?/h4>
package moduleC.test

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory

class HBaseSink(tableName: String, columnFamily: String) extends RichSinkFunction[String] {

   private var connection: client.Connection = _
   private var table: client.Table = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    // 創(chuàng)建 HBase 連接
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "bigdata1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    connection = ConnectionFactory.createConnection(conf)
    // HBase 表信息
    table = connection.getTable(TableName.valueOf(tableName))
  }

  override def invoke(value: String, context: SinkFunction.Context): Unit = {
    val put = new Put(Bytes.toBytes(value))
    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("data"), Bytes.toBytes(value))
    table.put(put)
  }

  override def close(): Unit = {
    if (table != null) table.close()
    if (connection != null) connection.close()
  }

}

三、重難點(diǎn)分析

本題的難點(diǎn)主要在以下幾個(gè)方面:

  1. Flink對Kafka的集成:需要了解如何使用Flink作為Kafka消費(fèi)者來消費(fèi)Kafka中的數(shù)據(jù),并掌握Flink如何處理流式數(shù)據(jù)。

  2. 數(shù)據(jù)表的前綴區(qū)分:需要能夠識(shí)別數(shù)據(jù)中不同表的前綴,并根據(jù)前綴將數(shù)據(jù)分別分發(fā)到不同的DWD層Topic中。

  3. Kafka的使用:需要了解Kafka的基本原理和API使用,能夠編寫代碼將數(shù)據(jù)發(fā)送到指定的DWD層Topic中。

  4. 數(shù)據(jù)處理:需要針對數(shù)據(jù)進(jìn)行必要的處理和轉(zhuǎn)換,以便將其發(fā)送到正確的目標(biāo)Topic中。

綜上,本題需要具備Flink、Kafka和數(shù)據(jù)處理的基本知識(shí)和編程能力,同時(shí)需要具備一定的調(diào)試能力,因此可能對初學(xué)者而言會(huì)有一定的難度。

?文章來源地址http://www.zghlxwxcb.cn/news/detail-768256.html

  1. 在根據(jù)前綴區(qū)分那塊,需要了解Flink對與數(shù)據(jù)拆解的方法和一些使用的技巧
  2. HBaseSink的編寫,這是一個(gè)難點(diǎn),需要了解HBaseSink的原理,相對于其他數(shù)據(jù)倉庫來說算比較復(fù)雜的了。使用前需要開啟Hadoop,它是基于Hadoop hdfs的。

總結(jié)?

什么是HBase?

????????HBase是一個(gè)Apache Hadoop生態(tài)系統(tǒng)中的分布式NoSQL數(shù)據(jù)庫。它是一個(gè)面向列的數(shù)據(jù)庫,旨在提供高度可擴(kuò)展性和可靠性,以便處理大規(guī)模數(shù)據(jù)集。HBase的設(shè)計(jì)靈感來自于Google的Bigtable論文,并且提供了類似于Bigtable的數(shù)據(jù)模型和API,但是開源和可擴(kuò)展。

????????HBase的數(shù)據(jù)存儲(chǔ)在Hadoop HDFS(Hadoop分布式文件系統(tǒng))上,并且可以在多個(gè)節(jié)點(diǎn)上分布式存儲(chǔ)和處理。它支持高并發(fā)讀寫操作,具有快速的隨機(jī)讀/寫訪問速度,并且能夠處理PB級別的數(shù)據(jù)集。

????????HBase的應(yīng)用場景包括互聯(lián)網(wǎng)應(yīng)用、日志處理、社交網(wǎng)絡(luò)、金融服務(wù)、電信和游戲等領(lǐng)域,為這些領(lǐng)域提供了一種高效處理大量數(shù)據(jù)的解決方案。

????????以上來自網(wǎng)絡(luò)

?

????????請關(guān)注我的大數(shù)據(jù)技術(shù)專欄大數(shù)據(jù)技術(shù) 作者: Eternity.Arrebol

????????請關(guān)注我獲取更多與大數(shù)據(jù)相關(guān)的文章Eternity.Arrebol的博客

Q-歡迎在評論區(qū)進(jìn)行交流-Q

?

?

到了這里,關(guān)于大數(shù)據(jù)之使用Flink消費(fèi)Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)不同的表前綴區(qū)分在存入Kafka的topic當(dāng)中的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka消費(fèi)多個(gè)topic的使用

    我們在業(yè)務(wù)中難免遇到一個(gè)kafka消費(fèi)多個(gè)topic的消息,本文幫助大家如何在業(yè)務(wù)中用一個(gè)類消費(fèi)多個(gè)topic消息 配置類1

    2024年02月11日
    瀏覽(35)
  • Flink使用 KafkaSource消費(fèi) Kafka中的數(shù)據(jù)

    目前,很多 flink相關(guān)的書籍和網(wǎng)上的文章講解如何對接 kafka時(shí)都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標(biāo)記為 deprecated(不推薦),如下: 新版本的 flink應(yīng)該使用 KafkaSource來消費(fèi) kafka中的數(shù)據(jù),詳細(xì)代碼如下: 開發(fā)者在工作中應(yīng)該盡量避

    2024年02月15日
    瀏覽(21)
  • 掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    ? ? ? ? 導(dǎo)讀:使用Flink實(shí)時(shí)消費(fèi)Kafka數(shù)據(jù)的案例是探索實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實(shí)用,而且對于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個(gè)在 有界 數(shù)據(jù)流和 無界 數(shù)據(jù)流上進(jìn)行有狀態(tài)計(jì)算分布式處理引擎和框架。Flink 設(shè)計(jì)旨

    2024年02月03日
    瀏覽(31)
  • Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi)

    Kafka3.1部署和Topic主題數(shù)據(jù)生產(chǎn)與消費(fèi)

    本章節(jié)主要講述Kafka3.1X版本在Windows11主機(jī)下部署以及JAVA對Kafka應(yīng)用: 1.安裝JDK配置環(huán)境變量 2.Zookeeper(zookeeper-3.7.1) zk 部署后的目錄位置:D:setupapache-zookeeper-3.7.1 3.安裝Kafka3.1X 3.1 下載包(kafka_2.12-3.1.2.tgz) Kafka 3.2、 解壓并進(jìn)入Kafka目錄: 根目錄:D:setupkafka3.1.2 3、 編輯

    2024年02月09日
    瀏覽(23)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費(fèi) Kafka 數(shù)據(jù)寫入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關(guān)配置都是通過 Mysql 讀取的(生產(chǎn)環(huán)境中沒有直接寫死的,都是通過配置文件動(dòng)態(tài)配置),大家實(shí)際測試過程中可以將相關(guān)配置信息寫死。 2、此程序中 Kafka 涉及到了 Kerberos 認(rèn)證操作

    2024年02月03日
    瀏覽(21)
  • 多個(gè)消費(fèi)者訂閱一個(gè)Kafka的Topic(使用@KafkaListener和KafkaTemplate)

    記錄 :465 場景 :一個(gè)Producer在一個(gè)Topic發(fā)布消息,多個(gè)消費(fèi)者Consumer訂閱Kafka的Topic。每個(gè)Consumer指定一個(gè)特定的ConsumerGroup,達(dá)到一條消息被多個(gè)不同的ConsumerGroup消費(fèi)。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安裝 :https://blog.csdn.net/zhangbeizhen18/arti

    2024年02月15日
    瀏覽(22)
  • 多個(gè)消費(fèi)者訂閱一個(gè)Kafka的Topic(使用KafkaConsumer和KafkaProducer)

    記錄 :466 場景 :一個(gè)KafkaProducer在一個(gè)Topic發(fā)布消息,多個(gè)消費(fèi)者KafkaConsumer訂閱Kafka的Topic。每個(gè)KafkaConsumer指定一個(gè)特定的ConsumerGroup,達(dá)到一條消息被多個(gè)不同的ConsumerGroup消費(fèi)。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安裝 :https://blog.csdn.net/zha

    2024年02月16日
    瀏覽(47)
  • Kafka/Spark-01消費(fèi)topic到寫出到topic

    Kafka/Spark-01消費(fèi)topic到寫出到topic

    消費(fèi)者代碼 注意點(diǎn) consumerConfigs是定義的可變的map的類型的,具體如下 consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是為了不限制groupId特意寫的傳參 是使用自帶的kafka工具類createDirectStream方法去消費(fèi)kafak 的數(shù)據(jù),詳細(xì)參數(shù)解釋如下 Subscribe傳參需要指定泛型,這邊指定string,

    2024年02月09日
    瀏覽(23)
  • Kafka某個(gè)Topic無法消費(fèi)問題

    Kafka某個(gè)Topic無法消費(fèi)問題

    12月28日,公司測試環(huán)境Kafka的task.build.metadata.flow這個(gè)topic突然無法消費(fèi)。 其他topic都正常使用,這個(gè)topic只有一個(gè)分區(qū),并且只有一個(gè)消費(fèi)者 首先登錄服務(wù)器,運(yùn)行kafka的cli命令,查看消費(fèi)者組的詳情。 由上圖可以發(fā)現(xiàn),task.build.metadata.flow這個(gè)topic,最新offset是2,但是當(dāng)前o

    2024年02月03日
    瀏覽(32)
  • Kafka - Topic 消費(fèi)狀態(tài)常用命令

    Kafka - Topic 消費(fèi)狀態(tài)常用命令

    replication-factor:指定副本數(shù)量 partitions:指定分區(qū) 查看consumer group列表有新、舊兩種命令,分別查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要區(qū)分指定bootstrap--server和zookeeper參數(shù) 這里同樣需要根據(jù)新、舊版本的consumer,分別指

    2024年01月25日
    瀏覽(30)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包