国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

HBase Shell操作&Flink寫入HBase

這篇具有很好參考價(jià)值的文章主要介紹了HBase Shell操作&Flink寫入HBase。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、HBase Shell操作

1、基本操作

1)進(jìn)入HBase客戶端命令行

    [root@bigdata1 hbase]$ bin/hbase shell

2)查看幫助命令

    hbase(main):001:0> help

3)查看當(dāng)前數(shù)據(jù)庫中有哪些表

    hbase(main):002:0> list

2、表的操作

1)創(chuàng)建表

    hbase(main):002:0> create 'student','info'

2)插入數(shù)據(jù)到表

    hbase(main):003:0> put 'student','1001','info:sex','male'
    hbase(main):004:0> put 'student','1001','info:age','18'
    hbase(main):005:0> put 'student','1002','info:name','Janna'
    hbase(main):006:0> put 'student','1002','info:sex','female'
    hbase(main):007:0> put 'student','1002','info:age','20'

3)掃描查看表數(shù)據(jù)

    hbase(main):008:0> scan 'student'
    hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW  => '1001'}
    hbase(main):010:0> scan 'student',{STARTROW => '1001'}

4)查看表結(jié)構(gòu)

    hbase(main):011:0> describe 'student'

5)更新指定字段的數(shù)據(jù)

    hbase(main):012:0> put 'student','1001','info:name','Nick'
    hbase(main):013:0> put 'student','1001','info:age','100'

6)查看“指定行”或“指定列族:列”的數(shù)據(jù)

    hbase(main):014:0> get 'student','1001'
    hbase(main):015:0> get 'student','1001','info:name'

7)統(tǒng)計(jì)表數(shù)據(jù)行數(shù)

    hbase(main):021:0> count 'student'

8)刪除數(shù)據(jù)

    刪除某rowkey的全部數(shù)據(jù):
    hbase(main):016:0> deleteall 'student','1001'
    刪除某rowkey的某一列數(shù)據(jù):
    hbase(main):017:0> delete 'student','1002','info:sex'

9)清空表數(shù)據(jù)

    hbase(main):018:0> truncate 'student'
    提示:清空表的操作順序?yàn)橄萪isable,然后再truncate。

10)刪除表

    首先需要先讓該表為disable狀態(tài):
    hbase(main):019:0> disable 'student'
    然后才能drop這個(gè)表:
    hbase(main):020:0> drop 'student'
    提示:如果直接drop表,會(huì)報(bào)錯(cuò):ERROR: Table student is enabled. Disable it first.

11)變更表信息

    將info列族中的數(shù)據(jù)存放3個(gè)版本:
    hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
    hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}

二、Flink整合HBase寫入操作

現(xiàn)在需要將Flink處理的數(shù)據(jù)存入HBase數(shù)據(jù)庫(namespace)shtd_result的order_info表中,rowkey為id的值,然后在Linux的HBase shell命令行中查詢列consignee,并查詢出任意5條

表空間為:shtd_result,表為order_info,列族為:info
表結(jié)構(gòu)為:

字段 類型 注釋
rowkey string HBase的主鍵,值為id
id bigint
consignee string
consignee_tel string
final_total_amount double
order_status string
user_id bigint
delivery_address string
order_comment string
out_trade_no string
trade_body string
create_time string 轉(zhuǎn)成yyyy-MM-dd hh:mm:ss格式的的字符串
operate_time string 轉(zhuǎn)成yyyy-MM-dd hh:mm:ss格式的的字符串
expire_time string 轉(zhuǎn)成yyyy-MM-dd hh:mm:ss格式的的字符串
tracking_no string
parent_order_id bigint
img_url string
province_id int
benefit_reduce_amount double

我們需要寫一個(gè)WriteToHBase類,集成自RichSinkFunction,RichSinkFunction 是一個(gè)抽象類,提供了一個(gè)更為豐富的接口,用于實(shí)現(xiàn)自定義的 Sink(接收器)功能。

在Scala api中RichSinkFunction的主要方法有open,invoke以及close。

  • open(Configuration parameters):

    這個(gè)方法在 Sink 函數(shù)初始化時(shí)被調(diào)用,通常用于一次性的設(shè)置工作,例如打開數(shù)據(jù)庫連接或初始化狀態(tài)。
    參數(shù) parameters 提供了訪問 Flink 配置的能力。

  • invoke(value: T, context: SinkFunction.Context):

    這是核心方法,用于處理每條流入的數(shù)據(jù)。
    value 參數(shù)代表當(dāng)前的數(shù)據(jù)元素。
    context 提供了此元素的上下文信息,如當(dāng)前的處理時(shí)間或事件時(shí)間。

  • close():

    當(dāng) Sink 不再接收數(shù)據(jù)時(shí)調(diào)用此方法,用于執(zhí)行清理工作,如關(guān)閉數(shù)據(jù)庫連接。
    這個(gè)方法是在最后一次調(diào)用 invoke 方法后執(zhí)行。

了解了這些方法后,我們來寫一下WriteToHBase

一、WriteToHBase的實(shí)現(xiàn)

  class WriteToHBase extends RichSinkFunction[OrderData] {
    @transient private var connection: Connection = _
    @transient private var table: Table = _

    override def open(parameters: Configuration): Unit = {
      val config = HBaseConfiguration.create()
      // 設(shè)置HBase配置, 如Zookeeper地址等
       config.set("hbase.zookeeper.quorum", "bigdata1:2181")

      connection = ConnectionFactory.createConnection(config)
      table = connection.getTable(TableName.valueOf("shtd_result:order_info"))
    }

    override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
      // 將 id 轉(zhuǎn)換為行鍵(假設(shè) id 是唯一的)
      val rowKey = Bytes.toBytes(value.id.toString)

      // 為該行創(chuàng)建一個(gè)新的 Put 實(shí)例
      val put = new Put(rowKey)

      // 向 Put 實(shí)例中添加列
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee"), Bytes.toBytes(value.consignee))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee_tel"), Bytes.toBytes(value.consignee_tel))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("final_total_amount"), Bytes.toBytes(value.final_total_amount))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_status"), Bytes.toBytes(value.order_status))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(value.user_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("delivery_address"), Bytes.toBytes(value.delivery_address))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_comment"), Bytes.toBytes(value.order_comment))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("out_trade_no"), Bytes.toBytes(value.out_trade_no))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("trade_body"), Bytes.toBytes(value.trade_body))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("operate_time"), Bytes.toBytes(value.operate_time))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("expire_time"), Bytes.toBytes(value.expire_time))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("tracking_no"), Bytes.toBytes(value.tracking_no))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parent_order_id"), Bytes.toBytes(value.parent_order_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("img_url"), Bytes.toBytes(value.img_url))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province_id"), Bytes.toBytes(value.province_id))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("benefit_reduce_amount"), Bytes.toBytes(value.benefit_reduce_amount))

      table.put(put)
    }

    override def close(): Unit = {
      if (table != null) {
        table.close()
      }
      if (connection != null) {
        connection.close()
      }
    }
  }

在 Scala 和 Java 中,@transient 關(guān)鍵字用于標(biāo)記一個(gè)類的成員變量為“暫時(shí)的”(transient),這意味著這個(gè)變量不會(huì)被默認(rèn)的序列化過程序列化。

在 Flink中,通常用于:

  • 防止序列化問題:當(dāng)一個(gè)對(duì)象需要在不同的機(jī)器或上下文中傳遞時(shí),某些屬性可能不支持序列化(例如,數(shù)據(jù)庫連接),或者序列化這些屬性沒有意義(例如,臨時(shí)緩存)。使用 @transient 可以避免這些字段在對(duì)象序列化時(shí)引發(fā)錯(cuò)誤。

  • 減少網(wǎng)絡(luò)開銷:對(duì)于不需要跨節(jié)點(diǎn)傳輸?shù)淖侄?,使?@transient 可以減少不必要的網(wǎng)絡(luò)傳輸開銷。

在 WriteToHBase 類中,connection 和 table 作為 HBase 的連接和表實(shí)例,通常不支持序列化,也不應(yīng)該被序列化。所以,它們被標(biāo)記為 @transient。


上面的向 Put 實(shí)例中添加列過于冗長,可以用反射來代替:

def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
  //獲取運(yùn)行時(shí)鏡像和實(shí)例鏡像
  val mirror = runtimeMirror(getClass.getClassLoader)
  val instanceMirror = mirror.reflect(data) 
  //獲取類成員并過濾方法
  val members = typeOf[T].members.sorted.filterNot(_.isMethod)

  //遍歷字段并添加到 Put 實(shí)例
  members.foreach { m =>
    val fieldMirror = instanceMirror.reflectField(m.asTerm)
    val name = m.name.toString.trim
    val value = fieldMirror.get.toString

    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  }
}

override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  val rowKey = Bytes.toBytes(value.id.toString)
  val put = new Put(rowKey)
  val infoCF = "info"

  // 使用反射自動(dòng)添加列
  addColumnsUsingReflection(put, infoCF, value)

  table.put(put)
}

現(xiàn)在來解釋一下這段代碼:

addColumnsUsingReflection 函數(shù)定義

def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
  • [T: TypeTag]:類型參數(shù) T,帶有一個(gè)上下文界定 TypeTag,這使得可以在運(yùn)行時(shí)獲取類型 T 的信息。
  • (put: Put, cf: String, data: T):函數(shù)接受三個(gè)參數(shù):put 是 HBase 的 Put 實(shí)例,cf 是列族名,data 是要插入的數(shù)據(jù)對(duì)象。

獲取運(yùn)行時(shí)鏡像和實(shí)例鏡像

  val mirror = runtimeMirror(getClass.getClassLoader)
  val instanceMirror = mirror.reflect(data)
  • val mirror:創(chuàng)建一個(gè) Mirror 實(shí)例,它是反射 API 的入口點(diǎn)。
  • runtimeMirror(getClass.getClassLoader):獲取當(dāng)前類的類加載器的運(yùn)行時(shí)鏡像。
  • val instanceMirror:反射 data 對(duì)象,得到一個(gè)可以用來訪問 data 實(shí)例成員的 InstanceMirror。

獲取類成員并過濾方法

  val members = typeOf[T].members.sorted.filterNot(_.isMethod)
  • typeOf[T]:獲取類型 T 的類型信息。
  • .members:獲取類型 T 的所有成員(字段和方法)。
  • .sorted:對(duì)成員進(jìn)行排序(默認(rèn)按名稱)。
  • .filterNot(_.isMethod):過濾掉方法成員,只保留字段。

遍歷字段并添加到 Put 實(shí)例

  members.foreach { m =>
    val fieldMirror = instanceMirror.reflectField(m.asTerm)
    val name = m.name.toString.trim
    val value = fieldMirror.get.toString
    put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  }
  • 遍歷所有字段。
  • val fieldMirror:反射每個(gè)字段,得到一個(gè)可以操作字段的 FieldMirror。
  • m.asTerm:將成員 m 轉(zhuǎn)換為一個(gè) term(字段)。
  • val name:獲取字段名,并去除首尾空格。
  • val value:獲取字段的值并轉(zhuǎn)換為字符串。
  • put.addColumn:向 Put 實(shí)例添加列,列族為 cf,列名為字段名 name,值為字段值 value

invoke 方法

override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  val rowKey = Bytes.toBytes(value.id.toString)
  val put = new Put(rowKey)
  val infoCF = "info"
  addColumnsUsingReflection(put, infoCF, value)
  table.put(put)
}
  • override def invoke:重寫 RichSinkFunctioninvoke 方法。
  • val rowKey:將 OrderDataid 字段轉(zhuǎn)換為字節(jié)作為行鍵。
  • val put:創(chuàng)建一個(gè)新的 Put 實(shí)例。
  • val infoCF:定義列族名。
  • addColumnsUsingReflection:調(diào)用之前定義的函數(shù)來動(dòng)態(tài)添加列。
  • table.put(put):將 Put 實(shí)例寫入 HBase 表。

這段代碼通過反射自動(dòng)化了向 HBase Put 實(shí)例添加數(shù)據(jù)的過程,避免了手動(dòng)為每個(gè)字段編寫重復(fù)代碼的需要。

然后我們需要對(duì)于dataStream應(yīng)用剛才寫的 WriteToHBase 類

應(yīng)用 WriteToHBase 類

dataStream.addSink(new WriteToHBase)

二、HBase Shell操作

1. 啟動(dòng) HBase Shell

首先,我們需要進(jìn)入 HBase Shell。在命令行中輸入:

hbase shell

2. 創(chuàng)建命名空間

如果命名空間 shtd_result 還不存在,需要先創(chuàng)建它。在 HBase Shell 中執(zhí)行以下命令:

create_namespace 'shtd_result'

3. 創(chuàng)建表

接著,創(chuàng)建表 order_info。我們需要定義至少一個(gè)列族(在這個(gè)示例中,我將使用 info 作為列族名)。在 HBase Shell 中執(zhí)行以下命令:

create 'shtd_result:order_info', 'info'

這里,'shtd_result:order_info' 指定了完整的表名(包括命名空間),而 'info' 是列族名。

4. 驗(yàn)證表創(chuàng)建

最后,您可以列出所有表來驗(yàn)證新表是否已成功創(chuàng)建:

list

5. 查詢

scan 'shtd_result:order_info', {COLUMNS => ['info:consignee'], LIMIT => 5}

這里的 scan 命令用于掃描 shtd_result:order_info 表,COLUMNS 參數(shù)指定我們只關(guān)心 info:consignee 列(假設(shè) consignee 存儲(chǔ)在名為 info 的列族中),而 LIMIT => 5 指定我們只查看 5 條記錄。文章來源地址http://www.zghlxwxcb.cn/news/detail-760505.html

到了這里,關(guān)于HBase Shell操作&Flink寫入HBase的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • HBase Shell 操作

    HBase Shell 操作

    1.1、進(jìn)入HBase客戶端命令行 前提是先啟動(dòng)hadoop集群和zookeeper集群。 1.2、查看幫助命令 help 查看指定命令的語法規(guī)則 查看 list_namespace 的用法(‘記得加單引號(hào)’) 我們首先查看現(xiàn)在有哪些命名空間 ,使用指令:list_namespace 注意:我的hadoop版本3.3.0,HBase2.4.17,兩者兼容性有問

    2024年02月13日
    瀏覽(50)
  • HBase基礎(chǔ)及shell操作

    HBase基礎(chǔ)及shell操作

    HBase是采用java語言編寫的一款 apache 開源的基于HDFS的NoSQL型數(shù)據(jù)庫,不支持 SQL,不支持事務(wù),不支持Join操作,沒有表關(guān)系 1.不支持事務(wù) 2.主要存儲(chǔ)結(jié)構(gòu)化數(shù)據(jù)以及半結(jié)構(gòu)化的數(shù)據(jù) 3.?HBase中數(shù)據(jù)存儲(chǔ)都是以 字節(jié) 的形式來存儲(chǔ)的 4.HBase是易于擴(kuò)展的 1- 大: 在一個(gè)表中可以存儲(chǔ)上

    2023年04月08日
    瀏覽(25)
  • HBase(11):shell管理操作

    HBase(11):shell管理操作

    1 status 例如:顯示服務(wù)器狀態(tài) ? 2 whoami 顯示HBase當(dāng)前用戶,例如: ? 3 list 顯示當(dāng)前所有的表 4 count 統(tǒng)計(jì)指定表的記錄數(shù),例如: ? ? 5 describe 展示表結(jié)構(gòu)信息 ? 6 exists 檢查表是否存在,適用于表量特別多的情況

    2024年02月12日
    瀏覽(29)
  • HBase Shell基本操作

    HBase Shell基本操作

    先在Linux Shell命令行終端執(zhí)行 start-dfs.sh 腳本啟動(dòng)HDFS,再執(zhí)行 start-hbase.sh 腳本啟動(dòng)HBase。如果Linux系統(tǒng)已配置HBase環(huán)境變量,可直接在任意目錄下執(zhí)行 hbase shell 腳本命令,就可進(jìn)入HBase Shell的命令行終端環(huán)境, exit 可以退出HBase Shell(我安裝的是偽分布式的HBase)。 (1) help幫

    2024年04月13日
    瀏覽(23)
  • HBase高手之路4-Shell操作

    HBase高手之路4-Shell操作

    命令 功能 create 創(chuàng)建表 put 插入或者更新數(shù)據(jù) get 獲取限定行或者列的數(shù)據(jù) scan 全表掃描或掃描表并返回表的數(shù)據(jù) describe 查看表的結(jié)構(gòu) count 統(tǒng)計(jì)行數(shù) delete 刪除指定的行或列的數(shù)據(jù) deleteall 刪除整個(gè)行或者列的數(shù)據(jù) truncate 刪除表的數(shù)據(jù),結(jié)構(gòu)還在 drop 刪除整個(gè)表(包括數(shù)據(jù))

    2023年04月17日
    瀏覽(32)
  • 第1關(guān):HBase Shell 操作:分區(qū)壓縮

    第1關(guān):HBase Shell 操作:分區(qū)壓縮

    任務(wù)描述 本關(guān)任務(wù):在 HBase Shell 中使用分區(qū)壓縮命令并將查看到的命令結(jié)果復(fù)制到指定的文件中。 相關(guān)知識(shí) 為了完成本關(guān)任務(wù),你需要掌握: 1.數(shù)據(jù)分區(qū)壓縮的概念; 2.數(shù)據(jù)分區(qū)壓縮的原因; 3.數(shù)據(jù)分區(qū)壓縮的過程; 4.數(shù)據(jù)分區(qū)壓縮的觸發(fā)時(shí)機(jī); 5.數(shù)據(jù)分區(qū)壓縮的詳解。 數(shù)據(jù)分

    2024年04月13日
    瀏覽(111)
  • Hbase安裝和shell客戶端操作

    Hbase安裝和shell客戶端操作

    HBase 是一個(gè) 面向列式存儲(chǔ)的分布式數(shù)據(jù)庫 ,其設(shè)計(jì)思想來源于 Google 的 BigTable 論文。 HBase 底層存儲(chǔ)基于 HDFS 實(shí)現(xiàn),集群的管理基于 ZooKeeper 實(shí)現(xiàn)。 HBase 良好的分布式架構(gòu)設(shè)計(jì)為海量數(shù)據(jù)的快速存儲(chǔ)、隨機(jī)訪問提供了可能,基于數(shù)據(jù)副本機(jī)制和分區(qū)機(jī)制可以輕松實(shí)現(xiàn)在線擴(kuò)容

    2024年02月08日
    瀏覽(30)
  • 大數(shù)據(jù)----33.hbase中的shell文件操作

    大數(shù)據(jù)----33.hbase中的shell文件操作

    HBase的命令行工具,最簡(jiǎn)單的接口,適合HBase管理使用,可以使用shell命令來查詢HBase中數(shù)據(jù)的詳細(xì)情況。 注意:如果進(jìn)入hbase后長時(shí)間不操作; 發(fā)生hbase自動(dòng)關(guān)閉沒有了進(jìn)程; 原因是內(nèi)存不夠;可以關(guān)閉機(jī)器;增加內(nèi)存;虛擬機(jī)就擴(kuò)大內(nèi)存。 1、進(jìn)入 hbase 客戶端、幫助命令

    2024年01月24日
    瀏覽(27)
  • HBase Shell 操作:自動(dòng)拆分和預(yù)分區(qū)

    啟動(dòng)hadoop集群 start-all.sh 啟動(dòng)Zookeeper集群 zkServer.sh start 啟動(dòng)HBase start-hbase.sh 進(jìn)入hbase shell hbase shell 創(chuàng)建的表使用自動(dòng)拆分命令 create \\\'stu\\\',{METADATA={\\\'SPLIT_POLICY\\\'=\\\'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy\\\'}},{NAME=\\\'sc\\\'} 第二關(guān):預(yù)分區(qū) 具體預(yù)分區(qū)配置要求如下所述: 文本文件

    2024年04月10日
    瀏覽(21)
  • HBase Shell啟動(dòng)緩慢及操作耗時(shí)長的原因分析與解決

    在內(nèi)網(wǎng)搭了一個(gè) hbase-2.2.6(hadoop-2.7.3)的環(huán)境,使用的是其內(nèi)置的 zookeeper-3.4.10,16010端口對(duì)應(yīng)的 web界面可以正常訪問,且各項(xiàng)功能正常。 在使用 hbase shell的過程中,首先是 hbase shell啟動(dòng)非常慢,約 210s才成功,其次執(zhí)行 scan、put、get等命令需要 20s左右才能完成。以筆者的經(jīng)

    2024年02月02日
    瀏覽(45)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包