一、HBase Shell操作
1、基本操作
1)進(jìn)入HBase客戶端命令行
[root@bigdata1 hbase]$ bin/hbase shell
2)查看幫助命令
hbase(main):001:0> help
3)查看當(dāng)前數(shù)據(jù)庫中有哪些表
hbase(main):002:0> list
2、表的操作
1)創(chuàng)建表
hbase(main):002:0> create 'student','info'
2)插入數(shù)據(jù)到表
hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'
3)掃描查看表數(shù)據(jù)
hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}
4)查看表結(jié)構(gòu)
hbase(main):011:0> describe 'student'
5)更新指定字段的數(shù)據(jù)
hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'
6)查看“指定行”或“指定列族:列”的數(shù)據(jù)
hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'
7)統(tǒng)計(jì)表數(shù)據(jù)行數(shù)
hbase(main):021:0> count 'student'
8)刪除數(shù)據(jù)
刪除某rowkey的全部數(shù)據(jù):
hbase(main):016:0> deleteall 'student','1001'
刪除某rowkey的某一列數(shù)據(jù):
hbase(main):017:0> delete 'student','1002','info:sex'
9)清空表數(shù)據(jù)
hbase(main):018:0> truncate 'student'
提示:清空表的操作順序?yàn)橄萪isable,然后再truncate。
10)刪除表
首先需要先讓該表為disable狀態(tài):
hbase(main):019:0> disable 'student'
然后才能drop這個(gè)表:
hbase(main):020:0> drop 'student'
提示:如果直接drop表,會(huì)報(bào)錯(cuò):ERROR: Table student is enabled. Disable it first.
11)變更表信息
將info列族中的數(shù)據(jù)存放3個(gè)版本:
hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
二、Flink整合HBase寫入操作
現(xiàn)在需要將Flink處理的數(shù)據(jù)存入HBase數(shù)據(jù)庫(namespace)shtd_result的order_info表中,rowkey為id的值,然后在Linux的HBase shell命令行中查詢列consignee,并查詢出任意5條
表空間為:shtd_result,表為order_info,列族為:info
表結(jié)構(gòu)為:
字段 | 類型 | 注釋 |
---|---|---|
rowkey | string | HBase的主鍵,值為id |
id | bigint | |
consignee | string | |
consignee_tel | string | |
final_total_amount | double | |
order_status | string | |
user_id | bigint | |
delivery_address | string | |
order_comment | string | |
out_trade_no | string | |
trade_body | string | |
create_time | string | 轉(zhuǎn)成yyyy-MM-dd hh:mm:ss格式的的字符串 |
operate_time | string | 轉(zhuǎn)成yyyy-MM-dd hh:mm:ss格式的的字符串 |
expire_time | string | 轉(zhuǎn)成yyyy-MM-dd hh:mm:ss格式的的字符串 |
tracking_no | string | |
parent_order_id | bigint | |
img_url | string | |
province_id | int | |
benefit_reduce_amount | double |
我們需要寫一個(gè)WriteToHBase類,集成自RichSinkFunction,RichSinkFunction 是一個(gè)抽象類,提供了一個(gè)更為豐富的接口,用于實(shí)現(xiàn)自定義的 Sink(接收器)功能。
在Scala api中RichSinkFunction的主要方法有open,invoke以及close。
-
open(Configuration parameters):
這個(gè)方法在 Sink 函數(shù)初始化時(shí)被調(diào)用,通常用于一次性的設(shè)置工作,例如打開數(shù)據(jù)庫連接或初始化狀態(tài)。
參數(shù) parameters 提供了訪問 Flink 配置的能力。 -
invoke(value: T, context: SinkFunction.Context):
這是核心方法,用于處理每條流入的數(shù)據(jù)。
value 參數(shù)代表當(dāng)前的數(shù)據(jù)元素。
context 提供了此元素的上下文信息,如當(dāng)前的處理時(shí)間或事件時(shí)間。 -
close():
當(dāng) Sink 不再接收數(shù)據(jù)時(shí)調(diào)用此方法,用于執(zhí)行清理工作,如關(guān)閉數(shù)據(jù)庫連接。
這個(gè)方法是在最后一次調(diào)用 invoke 方法后執(zhí)行。
了解了這些方法后,我們來寫一下WriteToHBase
一、WriteToHBase的實(shí)現(xiàn)
class WriteToHBase extends RichSinkFunction[OrderData] {
@transient private var connection: Connection = _
@transient private var table: Table = _
override def open(parameters: Configuration): Unit = {
val config = HBaseConfiguration.create()
// 設(shè)置HBase配置, 如Zookeeper地址等
config.set("hbase.zookeeper.quorum", "bigdata1:2181")
connection = ConnectionFactory.createConnection(config)
table = connection.getTable(TableName.valueOf("shtd_result:order_info"))
}
override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
// 將 id 轉(zhuǎn)換為行鍵(假設(shè) id 是唯一的)
val rowKey = Bytes.toBytes(value.id.toString)
// 為該行創(chuàng)建一個(gè)新的 Put 實(shí)例
val put = new Put(rowKey)
// 向 Put 實(shí)例中添加列
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee"), Bytes.toBytes(value.consignee))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee_tel"), Bytes.toBytes(value.consignee_tel))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("final_total_amount"), Bytes.toBytes(value.final_total_amount))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_status"), Bytes.toBytes(value.order_status))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(value.user_id))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("delivery_address"), Bytes.toBytes(value.delivery_address))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_comment"), Bytes.toBytes(value.order_comment))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("out_trade_no"), Bytes.toBytes(value.out_trade_no))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("trade_body"), Bytes.toBytes(value.trade_body))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("operate_time"), Bytes.toBytes(value.operate_time))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("expire_time"), Bytes.toBytes(value.expire_time))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("tracking_no"), Bytes.toBytes(value.tracking_no))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parent_order_id"), Bytes.toBytes(value.parent_order_id))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("img_url"), Bytes.toBytes(value.img_url))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province_id"), Bytes.toBytes(value.province_id))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("benefit_reduce_amount"), Bytes.toBytes(value.benefit_reduce_amount))
table.put(put)
}
override def close(): Unit = {
if (table != null) {
table.close()
}
if (connection != null) {
connection.close()
}
}
}
在 Scala 和 Java 中,@transient 關(guān)鍵字用于標(biāo)記一個(gè)類的成員變量為“暫時(shí)的”(transient),這意味著這個(gè)變量不會(huì)被默認(rèn)的序列化過程序列化。
在 Flink中,通常用于:
-
防止序列化問題:當(dāng)一個(gè)對(duì)象需要在不同的機(jī)器或上下文中傳遞時(shí),某些屬性可能不支持序列化(例如,數(shù)據(jù)庫連接),或者序列化這些屬性沒有意義(例如,臨時(shí)緩存)。使用 @transient 可以避免這些字段在對(duì)象序列化時(shí)引發(fā)錯(cuò)誤。
-
減少網(wǎng)絡(luò)開銷:對(duì)于不需要跨節(jié)點(diǎn)傳輸?shù)淖侄?,使?@transient 可以減少不必要的網(wǎng)絡(luò)傳輸開銷。
在 WriteToHBase 類中,connection 和 table 作為 HBase 的連接和表實(shí)例,通常不支持序列化,也不應(yīng)該被序列化。所以,它們被標(biāo)記為 @transient。
上面的向 Put 實(shí)例中添加列過于冗長,可以用反射來代替:
def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
//獲取運(yùn)行時(shí)鏡像和實(shí)例鏡像
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(data)
//獲取類成員并過濾方法
val members = typeOf[T].members.sorted.filterNot(_.isMethod)
//遍歷字段并添加到 Put 實(shí)例
members.foreach { m =>
val fieldMirror = instanceMirror.reflectField(m.asTerm)
val name = m.name.toString.trim
val value = fieldMirror.get.toString
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
}
}
override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
val rowKey = Bytes.toBytes(value.id.toString)
val put = new Put(rowKey)
val infoCF = "info"
// 使用反射自動(dòng)添加列
addColumnsUsingReflection(put, infoCF, value)
table.put(put)
}
現(xiàn)在來解釋一下這段代碼:
addColumnsUsingReflection 函數(shù)定義
def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
-
[T: TypeTag]
:類型參數(shù)T
,帶有一個(gè)上下文界定TypeTag
,這使得可以在運(yùn)行時(shí)獲取類型T
的信息。 -
(put: Put, cf: String, data: T)
:函數(shù)接受三個(gè)參數(shù):put
是 HBase 的Put
實(shí)例,cf
是列族名,data
是要插入的數(shù)據(jù)對(duì)象。
獲取運(yùn)行時(shí)鏡像和實(shí)例鏡像
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(data)
-
val mirror
:創(chuàng)建一個(gè)Mirror
實(shí)例,它是反射 API 的入口點(diǎn)。 -
runtimeMirror(getClass.getClassLoader)
:獲取當(dāng)前類的類加載器的運(yùn)行時(shí)鏡像。 -
val instanceMirror
:反射data
對(duì)象,得到一個(gè)可以用來訪問data
實(shí)例成員的InstanceMirror
。
獲取類成員并過濾方法
val members = typeOf[T].members.sorted.filterNot(_.isMethod)
-
typeOf[T]
:獲取類型T
的類型信息。 -
.members
:獲取類型T
的所有成員(字段和方法)。 -
.sorted
:對(duì)成員進(jìn)行排序(默認(rèn)按名稱)。 -
.filterNot(_.isMethod)
:過濾掉方法成員,只保留字段。
遍歷字段并添加到 Put
實(shí)例
members.foreach { m =>
val fieldMirror = instanceMirror.reflectField(m.asTerm)
val name = m.name.toString.trim
val value = fieldMirror.get.toString
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
}
- 遍歷所有字段。
-
val fieldMirror
:反射每個(gè)字段,得到一個(gè)可以操作字段的FieldMirror
。 -
m.asTerm
:將成員m
轉(zhuǎn)換為一個(gè) term(字段)。 -
val name
:獲取字段名,并去除首尾空格。 -
val value
:獲取字段的值并轉(zhuǎn)換為字符串。 -
put.addColumn
:向Put
實(shí)例添加列,列族為cf
,列名為字段名name
,值為字段值value
。
invoke
方法
override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
val rowKey = Bytes.toBytes(value.id.toString)
val put = new Put(rowKey)
val infoCF = "info"
addColumnsUsingReflection(put, infoCF, value)
table.put(put)
}
-
override def invoke
:重寫RichSinkFunction
的invoke
方法。 -
val rowKey
:將OrderData
的id
字段轉(zhuǎn)換為字節(jié)作為行鍵。 -
val put
:創(chuàng)建一個(gè)新的Put
實(shí)例。 -
val infoCF
:定義列族名。 -
addColumnsUsingReflection
:調(diào)用之前定義的函數(shù)來動(dòng)態(tài)添加列。 -
table.put(put)
:將Put
實(shí)例寫入 HBase 表。
這段代碼通過反射自動(dòng)化了向 HBase Put 實(shí)例添加數(shù)據(jù)的過程,避免了手動(dòng)為每個(gè)字段編寫重復(fù)代碼的需要。
然后我們需要對(duì)于dataStream應(yīng)用剛才寫的 WriteToHBase 類
應(yīng)用 WriteToHBase 類
dataStream.addSink(new WriteToHBase)
二、HBase Shell操作
1. 啟動(dòng) HBase Shell
首先,我們需要進(jìn)入 HBase Shell。在命令行中輸入:
hbase shell
2. 創(chuàng)建命名空間
如果命名空間 shtd_result
還不存在,需要先創(chuàng)建它。在 HBase Shell 中執(zhí)行以下命令:
create_namespace 'shtd_result'
3. 創(chuàng)建表
接著,創(chuàng)建表 order_info
。我們需要定義至少一個(gè)列族(在這個(gè)示例中,我將使用 info
作為列族名)。在 HBase Shell 中執(zhí)行以下命令:
create 'shtd_result:order_info', 'info'
這里,'shtd_result:order_info'
指定了完整的表名(包括命名空間),而 'info'
是列族名。
4. 驗(yàn)證表創(chuàng)建
最后,您可以列出所有表來驗(yàn)證新表是否已成功創(chuàng)建:文章來源:http://www.zghlxwxcb.cn/news/detail-760505.html
list
5. 查詢
scan 'shtd_result:order_info', {COLUMNS => ['info:consignee'], LIMIT => 5}
這里的 scan
命令用于掃描 shtd_result:order_info
表,COLUMNS
參數(shù)指定我們只關(guān)心 info:consignee
列(假設(shè) consignee
存儲(chǔ)在名為 info
的列族中),而 LIMIT => 5
指定我們只查看 5 條記錄。文章來源地址http://www.zghlxwxcb.cn/news/detail-760505.html
到了這里,關(guān)于HBase Shell操作&Flink寫入HBase的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!