目錄
前言
題目:
一、讀題分析
二、處理過程
? 1.數(shù)據(jù)處理部分:
2.HBaseSink(未經(jīng)測試,不能證明其正確性,僅供參考?。?/p>
三、重難點(diǎn)分析
總結(jié)?
什么是HBase?
前言
本題來源于全國職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項(xiàng)賽題 - 電商數(shù)據(jù)處理 - 實(shí)時(shí)數(shù)據(jù)處理
注:由于設(shè)備問題,代碼執(zhí)行結(jié)果以及數(shù)據(jù)的展示無法給出,可參照我以往的博客其中有相同數(shù)據(jù)源展示
題目:
????????使用Flink消費(fèi)Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)數(shù)據(jù)中不同的表前綴區(qū)分,將數(shù)據(jù)分別分發(fā)至kafka的DWD層的dim_customer_login_log的Topic中,其他的表則無需處理;
提示:以下是本篇文章正文內(nèi)容,下面案例可供參考(使用Scala語言編寫)
?
一、讀題分析
涉及組件:Scala,F(xiàn)link,Kafka,HBase
涉及知識(shí)點(diǎn):
- Flink函數(shù)的使用
- 了解HBase,基本使用HBase
二、處理過程
? 1.數(shù)據(jù)處理部分:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import java.util.Properties
object answer2 {
def main(args: Array[String]): Unit = {
import moduleC.test.HBaseSink2
import org.apache.flink.streaming.api.scala.DataStream
// 創(chuàng)建flink流環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 創(chuàng)建Kafka的配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "bigdata1:9092")
// 創(chuàng)建Kafka的消費(fèi)者
val kafkaConsumer = new FlinkKafkaConsumer[String]("ods_mall_log", new SimpleStringSchema(), properties)
// 讀取消費(fèi)的數(shù)據(jù)
val kafkaStream = env.addSource(kafkaConsumer)
val newStream: DataStream[(String, String)] = kafkaStream
.map(
line => {
val tablename = line.split(":")(0)
val data = line.split(":")(1).stripPrefix("(").stripSuffix(");")
// 12115|7611|0|0|'20230407111600'
(tablename, data)
}
).filter(_._1 == "customer_login_log")
newStream.print()
// 創(chuàng)建Kafka的生產(chǎn)者
val dwdKafkaProduce = new FlinkKafkaProducer[String]("dim_customer_login_log", new SimpleStringSchema(), properties)
// val dwdKafkaProduce2 = new FlinkKafkaProducer[String]("dwd-topic1", new SimpleStringSchema(), properties)
// 向Kafka發(fā)送數(shù)據(jù)
// newStream.addSink(dwdKafkaProduce) 不調(diào)用map方法會(huì)報(bào)錯(cuò)因?yàn)樾枰粋€(gè)string而不是(string,string)
val result = newStream.map(line => line._2)
result.addSink(dwdKafkaProduce)
// 發(fā)到HBase上
result.addSink(new HBaseSink2)
// execute
env.execute("flinkKafkaToKafka")
}
}
2.HBaseSink(未經(jīng)測試,不能證明其正確性,僅供參考?。?/h4>
package moduleC.test
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory
class HBaseSink(tableName: String, columnFamily: String) extends RichSinkFunction[String] {
private var connection: client.Connection = _
private var table: client.Table = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 創(chuàng)建 HBase 連接
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "bigdata1")
conf.set("hbase.zookeeper.property.clientPort", "2181")
connection = ConnectionFactory.createConnection(conf)
// HBase 表信息
table = connection.getTable(TableName.valueOf(tableName))
}
override def invoke(value: String, context: SinkFunction.Context): Unit = {
val put = new Put(Bytes.toBytes(value))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("data"), Bytes.toBytes(value))
table.put(put)
}
override def close(): Unit = {
if (table != null) table.close()
if (connection != null) connection.close()
}
}
三、重難點(diǎn)分析
package moduleC.test
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory
class HBaseSink(tableName: String, columnFamily: String) extends RichSinkFunction[String] {
private var connection: client.Connection = _
private var table: client.Table = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 創(chuàng)建 HBase 連接
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "bigdata1")
conf.set("hbase.zookeeper.property.clientPort", "2181")
connection = ConnectionFactory.createConnection(conf)
// HBase 表信息
table = connection.getTable(TableName.valueOf(tableName))
}
override def invoke(value: String, context: SinkFunction.Context): Unit = {
val put = new Put(Bytes.toBytes(value))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("data"), Bytes.toBytes(value))
table.put(put)
}
override def close(): Unit = {
if (table != null) table.close()
if (connection != null) connection.close()
}
}
本題的難點(diǎn)主要在以下幾個(gè)方面:
-
Flink對Kafka的集成:需要了解如何使用Flink作為Kafka消費(fèi)者來消費(fèi)Kafka中的數(shù)據(jù),并掌握Flink如何處理流式數(shù)據(jù)。
-
數(shù)據(jù)表的前綴區(qū)分:需要能夠識(shí)別數(shù)據(jù)中不同表的前綴,并根據(jù)前綴將數(shù)據(jù)分別分發(fā)到不同的DWD層Topic中。
-
Kafka的使用:需要了解Kafka的基本原理和API使用,能夠編寫代碼將數(shù)據(jù)發(fā)送到指定的DWD層Topic中。
-
數(shù)據(jù)處理:需要針對數(shù)據(jù)進(jìn)行必要的處理和轉(zhuǎn)換,以便將其發(fā)送到正確的目標(biāo)Topic中。
綜上,本題需要具備Flink、Kafka和數(shù)據(jù)處理的基本知識(shí)和編程能力,同時(shí)需要具備一定的調(diào)試能力,因此可能對初學(xué)者而言會(huì)有一定的難度。
?文章來源地址http://www.zghlxwxcb.cn/news/detail-768256.html
- 在根據(jù)前綴區(qū)分那塊,需要了解Flink對與數(shù)據(jù)拆解的方法和一些使用的技巧
- HBaseSink的編寫,這是一個(gè)難點(diǎn),需要了解HBaseSink的原理,相對于其他數(shù)據(jù)倉庫來說算比較復(fù)雜的了。使用前需要開啟Hadoop,它是基于Hadoop hdfs的。
總結(jié)?
什么是HBase?
????????HBase是一個(gè)Apache Hadoop生態(tài)系統(tǒng)中的分布式NoSQL數(shù)據(jù)庫。它是一個(gè)面向列的數(shù)據(jù)庫,旨在提供高度可擴(kuò)展性和可靠性,以便處理大規(guī)模數(shù)據(jù)集。HBase的設(shè)計(jì)靈感來自于Google的Bigtable論文,并且提供了類似于Bigtable的數(shù)據(jù)模型和API,但是開源和可擴(kuò)展。
????????HBase的數(shù)據(jù)存儲(chǔ)在Hadoop HDFS(Hadoop分布式文件系統(tǒng))上,并且可以在多個(gè)節(jié)點(diǎn)上分布式存儲(chǔ)和處理。它支持高并發(fā)讀寫操作,具有快速的隨機(jī)讀/寫訪問速度,并且能夠處理PB級別的數(shù)據(jù)集。
????????HBase的應(yīng)用場景包括互聯(lián)網(wǎng)應(yīng)用、日志處理、社交網(wǎng)絡(luò)、金融服務(wù)、電信和游戲等領(lǐng)域,為這些領(lǐng)域提供了一種高效處理大量數(shù)據(jù)的解決方案。
????????以上來自網(wǎng)絡(luò)
?文章來源:http://www.zghlxwxcb.cn/news/detail-768256.html
????????請關(guān)注我的大數(shù)據(jù)技術(shù)專欄大數(shù)據(jù)技術(shù) 作者: Eternity.Arrebol
????????請關(guān)注我獲取更多與大數(shù)據(jù)相關(guān)的文章Eternity.Arrebol的博客
Q-歡迎在評論區(qū)進(jìn)行交流-Q
?
?
到了這里,關(guān)于大數(shù)據(jù)之使用Flink消費(fèi)Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)不同的表前綴區(qū)分在存入Kafka的topic當(dāng)中的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!