国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用spark進(jìn)行hbase的bulkload

這篇具有很好參考價(jià)值的文章主要介紹了使用spark進(jìn)行hbase的bulkload。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

使用spark進(jìn)行hbase的bulkload

一、 背景

HBase 是一個(gè)面向列,schemaless,高吞吐,高可靠可水平擴(kuò)展的 NoSQL 數(shù)據(jù)庫(kù),用戶可以通過 HBase client 提供的 put get 等 api 實(shí)現(xiàn)在數(shù)據(jù)的實(shí)時(shí)讀寫。在過去的幾年里,HBase 有了長(zhǎng)足的發(fā)展,它在越來越多的公司里扮演者越來越重要的角色。
HBase 擅長(zhǎng)于海量數(shù)據(jù)的實(shí)時(shí)讀取,原生 HBase 沒有二級(jí)索引,復(fù)雜查詢場(chǎng)景支持的不好。同時(shí)因?yàn)?split,磁盤,網(wǎng)絡(luò)抖動(dòng),Java GC 等多方面的因素會(huì)影響其 RT 表現(xiàn),所以通常我們?cè)谑褂肏Base的同時(shí)也會(huì)使用其他的存儲(chǔ)中間件,比如 ES,Reids,Mysql 等等。避免 HBase 成為信息孤島,我們需要數(shù)據(jù)導(dǎo)入導(dǎo)出的工具在這些中間件之間做數(shù)據(jù)遷移,而最常用的莫過于阿里開源的 DataX。Datax從 其他數(shù)據(jù)源遷移數(shù)據(jù)到 HBase 實(shí)際上是走的 HBase 原生 api 接口,在少量數(shù)據(jù)的情況下沒有問題,但當(dāng)我們需要從 Hive 里,或者其他異構(gòu)存儲(chǔ)里批量導(dǎo)入幾億,幾十億的數(shù)據(jù),那么用 DataX 這里就顯得不那么適合,因?yàn)樽咴涌跒榱吮苊庥绊懮a(chǎn)集群的穩(wěn)定性一定要做好限流,那么海量數(shù)據(jù)的遷移就很很慢,同時(shí)數(shù)據(jù)的持續(xù)寫入會(huì)因?yàn)?flush,compaction 等機(jī)制占用較多的系統(tǒng)資源。為了解決批量導(dǎo)入的場(chǎng)景,Bulkload 應(yīng)運(yùn)而生。

二、HBase Bulkload
在大量數(shù)據(jù)需要寫入HBase時(shí),通常有 put方式和bulkLoad 兩種方式。

1、put方式為單條插入,在put數(shù)據(jù)時(shí)會(huì)先將數(shù)據(jù)的更新操作信息和數(shù)據(jù)信息 寫入WAL ,
在寫入到WAL后, 數(shù)據(jù)就會(huì)被放到MemStore中 ,當(dāng)MemStore滿后數(shù)據(jù)就會(huì)被 flush到磁盤
(即形成HFile文件) ,在這種寫操作過程會(huì)涉及到flush、split、compaction等操作,容易造
成節(jié)點(diǎn)不穩(wěn)定,數(shù)據(jù)導(dǎo)入慢,耗費(fèi)資源等問題,在海量數(shù)據(jù)的導(dǎo)入過程極大的消耗了系統(tǒng)
性能,避免這些問題最好的方法就是使用BulkLoad的方式來加載數(shù)據(jù)到HBase中。

使用spark進(jìn)行hbase的bulkload,數(shù)據(jù)庫(kù),spark,hbase


2、BulkLoader利用HBase數(shù)據(jù)按照HFile格式存儲(chǔ)在HDFS的原理,使用MapReduce直接批量
生成HFile格式文件后,RegionServers再將HFile文件移動(dòng)到相應(yīng)的Region目錄下。

  • Extract,異構(gòu)數(shù)據(jù)源數(shù)據(jù)導(dǎo)入到 HDFS 之上。
  • Transform,通過用戶代碼,可以是 MR 或者 Spark 任務(wù)將數(shù)據(jù)轉(zhuǎn)化為 HFile。
  • Load,HFile 通過 loadIncrementalHFiles 調(diào)用將 HFile 放置到 Region 對(duì)應(yīng)的 HDFS 目錄上,該過程可能涉及到文件切分。

使用spark進(jìn)行hbase的bulkload,數(shù)據(jù)庫(kù),spark,hbase

?三、實(shí)踐

hive表

使用spark進(jìn)行hbase的bulkload,數(shù)據(jù)庫(kù),spark,hbase


?

?hbase表

使用spark進(jìn)行hbase的bulkload,數(shù)據(jù)庫(kù),spark,hbase

?依賴

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

<properties>

????????<maven.compiler.source>1.8</maven.compiler.source>

????????<maven.compiler.target>1.8</maven.compiler.target>

????????<encoding>UTF-8</encoding>

????????<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>

????????<log4j.version>1.7.30</log4j.version>

????????<zk.version>3.4.5-cdh5.16.2</zk.version>

????????<scala.version>2.12.10</scala.version>

????????<scala.tools.version>2.12</scala.tools.version>

????????<spark.version>3.2.0</spark.version>

????????<hbase.version>1.2.0-cdh5.16.2</hbase.version>

????????<config.version>1.4.0</config.version>

????</properties>

?????

????<repositories>

????????<repository>

????????????<id>nexus-aliyun</id>

????????????<url>http://maven.aliyun.com/nexus/content/groups/public</url>

????????</repository>

????????<repository>

????????????<id>cloudera</id>

????????????<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

????????</repository>

????</repositories>

????<dependencies>

????????<dependency>

????????????<groupId>org.apache.hadoop</groupId>

????????????<artifactId>hadoop-client</artifactId>

????????????<version>${hadoop.version}</version>

????????</dependency>

????????<dependency>

????????????<groupId>org.slf4j</groupId>

????????????<artifactId>slf4j-log4j12</artifactId>

????????????<version>${log4j.version}</version>

????????</dependency>

????????<dependency>

????????????<groupId>org.apache.zookeeper</groupId>

????????????<artifactId>zookeeper</artifactId>

????????????<version>${zk.version}</version>

????????</dependency>

????????<dependency>

????????????<groupId>org.apache.spark</groupId>

????????????<artifactId>spark-core_${scala.tools.version}</artifactId>

????????????<version>${spark.version}</version>

????????</dependency>

????????<dependency>

????????????<groupId>org.apache.hbase</groupId>

????????????<artifactId>hbase-client</artifactId>

????????????<version>${hbase.version}</version>

????????</dependency>

????????<dependency>

????????????<groupId>org.apache.hbase</groupId>

????????????<artifactId>hbase-server</artifactId>

????????????<version>${hbase.version}</version>

????????</dependency>

????????<dependency>

????????????<groupId>org.apache.spark</groupId>

????????????<artifactId>spark-sql_${scala.tools.version}</artifactId>

????????????<version>${spark.version}</version>

????????</dependency>

?????????

????</dependencies>

spark 代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

package com.jojo

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}

import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

?* Description:Hbase批量加載?? 同一列族多列

?*/

object?HbaseBulkLoadApp {

??val zookeeperQuorum =?"cdh01,cdh02,cdh03"//zookeeper信息

??val dataSourcePath =?"hdfs://cdh03:8020/user/hive/warehouse/sample_07"?//源文件

??val hFilePath =?"hdfs://cdh03:8020/tmp/result"//hfile的存儲(chǔ)路徑

??val hdfsRootPath =?"hdfs://cdh03:8020/"//根路徑

??val tableName =?"sample_07"//表名

??val familyName =?"basic"//列族

??val arr = Array("code","description",?"total_emp","salary")//列的名字集合

??def main(args: Array[String]): Unit = {

????//獲取content

????val sparkConf =?new?SparkConf()

??????.setAppName(s"${this.getClass.getSimpleName}")

??????.setMaster("local")

??????//指定序列化格式,默認(rèn)是java序列化

??????.set("spark.serializer",?"org.apache.spark.serializer.KryoSerializer")

??????//告知哪些類型需要序列化

??????.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

????val sc =?new?SparkContext(sparkConf)

????//hadoop配置

????val hadoopConf =?new?Configuration()

????hadoopConf.set("fs.defaultFS", hdfsRootPath)

????//獲取輸出路徑

????val fileSystem = FileSystem.get(hadoopConf)

????//獲取hbase配置

????val hconf = HBaseConfiguration.create()

????//設(shè)置zookeeper集群

????hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)

????//設(shè)置端口

????hconf.set("hbase.zookeeper.property.clientPort",?"2181");

????//設(shè)置hfile最大個(gè)數(shù)

????hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")

????//設(shè)置hfile的大小

????hconf.set("hbase.hregion.max.filesize","10737418240")

????hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

????//獲取hbase連接

????val hbaseConn = ConnectionFactory.createConnection(hconf)

????val admin = hbaseConn.getAdmin

????/**

?????* 保存生成的HFile文件

?????* 注:bulk load? 生成的HFile文件需要落地

?????* 然后再通過LoadIncrementalHFiles類load進(jìn)Hbase

?????* 此處關(guān)于? sortBy 操作詳解:

?????* 0. Hbase查詢是根據(jù)rowkey進(jìn)行查詢的,并且rowkey是有序,

?????* 某種程度上來說rowkey就是一個(gè)索引,這是Hbase查詢高效的一個(gè)原因,

?????* 這就要求我們?cè)诓迦霐?shù)據(jù)的時(shí)候,要插在rowkey該在的位置。

?????* 1. Put方式插入數(shù)據(jù),會(huì)有WAL,同時(shí)在插入Hbase的時(shí)候會(huì)根據(jù)RowKey的值選擇合適的位置,此方式本身就可以保證RowKey有序

?????* 2. bulk load 方式?jīng)]有WAL,它更像是hive通過load方式直接將底層文件HFile移動(dòng)到制定的Hbase路徑下,所以,在不東HFile的情況下,要保證本身有序才行

?????* 之前寫的時(shí)候只要rowkey有序即可,但是2.0.2版本的時(shí)候發(fā)現(xiàn)clounm也要有序,所以會(huì)有sortBy(x => (x._1, x._2.getKeyString), true)

?????*

?????* @param hfileRDD

?????*/

????// 0. 準(zhǔn)備程序運(yùn)行的環(huán)境

????// 如果 HBase 表不存在,就創(chuàng)建一個(gè)新表

????if?(!admin.tableExists(TableName.valueOf(tableName))) {

??????val desc =?new?HTableDescriptor(TableName.valueOf(tableName))

??????val hcd =?new?HColumnDescriptor(familyName)

??????desc.addFamily(hcd)

??????admin.createTable(desc)

??????print("創(chuàng)建了一個(gè)新表")

????}

????// 如果存放 HFile文件的路徑已經(jīng)存在,就刪除掉

????if(fileSystem.exists(new?Path(hFilePath))) {

??????fileSystem.delete(new?Path(hFilePath),?true)

??????print("刪除hdfs上存在的路徑")

????}

????// 1. 清洗需要存放到 HFile 中的數(shù)據(jù),rowKey 一定要排序,否則會(huì)報(bào)錯(cuò):

????// java.io.IOException: Added a key not lexically larger than previous.

????val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath)

??????.map(row => {

????????// 處理數(shù)據(jù)的邏輯

????????val arrs = row.split("\t")

????????var?kvlist: Seq[KeyValue] = List()//存儲(chǔ)多個(gè)列

????????var?rowkey: Array[Byte] =?null

????????var?cn: Array[Byte] =?null

????????var?v: Array[Byte] =?null

????????var?kv: KeyValue =?null

????????val cf = familyName.getBytes?//列族

????????rowkey = Bytes.toBytes(arrs(0))?//key

????????for?(i <- 1 to (arrs.length - 1)) {

??????????cn = arr(i).getBytes()?//列的名稱

??????????v = Bytes.toBytes(arrs(i))?//列的值

??????????//將rdd轉(zhuǎn)換成HFile需要的格式,上面定義了Hfile的key是ImmutableBytesWritable,那么我們定義的RDD也是要以ImmutableBytesWritable的實(shí)例為key

??????????kv =?new?KeyValue(rowkey, cf, cn, v)?//封裝一下 rowkey, cf, clounmVale, value

??????????kvlist = kvlist :+ kv?//將新的kv加在kvlist后面(不能反 需要整體有序)

????????}

????????(new?ImmutableBytesWritable(rowkey), kvlist)

??????})

????val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data

??????.flatMapValues(_.iterator)

????// 2. Save Hfiles on HDFS

????val table = hbaseConn.getTable(TableName.valueOf(tableName))

????val job = Job.getInstance(hconf)

????job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

????job.setMapOutputValueClass(classOf[KeyValue])

????HFileOutputFormat2.configureIncrementalLoadMap(job, table)

????hfileRDD

??????.sortBy(x => (x._1, x._2.getKeyString),?true)?//要保持 整體有序

??????.saveAsNewAPIHadoopFile(hFilePath,

????????classOf[ImmutableBytesWritable],

????????classOf[KeyValue],

????????classOf[HFileOutputFormat2],

????????hconf)

????print("成功生成HFILE")

????val bulkLoader =?new?LoadIncrementalHFiles(hconf)

????val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))

????bulkLoader.doBulkLoad(new?Path(hFilePath), admin, table, regionLocator)

????hbaseConn.close()

????sc.stop()

??}

}

 其中可能遇到的問題:

1

EndOfStreamException: Unable to read additional data?from?server sessionid 0x17f44ca01833e45, likely server has closed socket

 解決:

  主要是zk的版本不匹配,在依賴選擇匹配的zk版本。

輸出結(jié)果

使用spark進(jìn)行hbase的bulkload,數(shù)據(jù)庫(kù),spark,hbase

https://www.cnblogs.com/huangguoming/articles/12967868.html文章來源地址http://www.zghlxwxcb.cn/news/detail-604613.html

到了這里,關(guān)于使用spark進(jìn)行hbase的bulkload的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 大數(shù)據(jù)之使用Spark全量抽取MySQL的數(shù)據(jù)到Hive數(shù)據(jù)庫(kù)

    前言 一、讀題分析 二、使用步驟 1.導(dǎo)入配置文件到pom.xml 2.代碼部分 三、重難點(diǎn)分析 總結(jié) 本題來源于全國(guó)職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項(xiàng)賽題-離線數(shù)據(jù)處理-數(shù)據(jù)抽?。ㄆ渌麜翰煌嘎叮?題目:編寫Scala代碼,使用Spark將MySQL的shtd_industry庫(kù)中表EnvironmentData,ChangeRecord,BaseMach

    2024年02月11日
    瀏覽(21)
  • 如何使用PyQt進(jìn)行數(shù)據(jù)庫(kù)操作?

    首先,我們要知道,PyQt是一個(gè)非常強(qiáng)大的圖形用戶界面(GUI)開發(fā)庫(kù),它允許我們使用Python語言創(chuàng)建美觀且高度交互的桌面應(yīng)用程序。然而,對(duì)于數(shù)據(jù)庫(kù)操作,PyQt并不直接提供此類功能。這需要我們使用其他的數(shù)據(jù)庫(kù)庫(kù),例如SQLite、MySQL或PostgreSQL等。 對(duì)于新手來說,我建議

    2024年02月11日
    瀏覽(89)
  • 【MySQL】使用DBeaver數(shù)據(jù)庫(kù)管理工具進(jìn)行MySQL數(shù)據(jù)庫(kù)連接

    【MySQL】使用DBeaver數(shù)據(jù)庫(kù)管理工具進(jìn)行MySQL數(shù)據(jù)庫(kù)連接

    一、數(shù)據(jù)庫(kù)連接信息填寫 1、服務(wù)器地址:填寫服務(wù)器部署的地址,以及端口號(hào) 2、數(shù)據(jù)庫(kù):sys 3、用戶名:root 4、密碼:服務(wù)器上面設(shè)置的具體密碼 以上信息填寫錯(cuò)誤的報(bào)錯(cuò)提示 :Access denied for user ‘XXX’@’%’ to database ‘10.42.67.22’ 二、數(shù)據(jù)庫(kù)說明 1、數(shù)據(jù)庫(kù)連接時(shí)選擇的

    2024年02月09日
    瀏覽(109)
  • 【大數(shù)據(jù)】分布式數(shù)據(jù)庫(kù)HBase

    【大數(shù)據(jù)】分布式數(shù)據(jù)庫(kù)HBase

    目錄 1.概述 1.1.前言 1.2.數(shù)據(jù)模型 1.3.列式存儲(chǔ)的優(yōu)勢(shì) 2.實(shí)現(xiàn)原理 2.1.region 2.2.LSM樹 2.3.完整讀寫過程 2.4.master的作用 本文式作者大數(shù)據(jù)系列專欄中的一篇文章,按照專欄來閱讀,循序漸進(jìn)能更好的理解,專欄地址: https://blog.csdn.net/joker_zjn/category_12631789.html?spm=1001.2014.3001.5482 當(dāng)

    2024年04月27日
    瀏覽(29)
  • HBase 與 NoSQL 數(shù)據(jù)庫(kù)對(duì)比:了解 HBase 在大數(shù)據(jù)領(lǐng)域的優(yōu)勢(shì)
  • Hbase的bulkload流程與實(shí)踐

    一、前言 ??通常 MapReduce 在寫 HBase 時(shí)使用的是 HTableOutputFormat 方式,在 reduce 中直接生成 put 對(duì)象寫入 HBase ,該方式在大數(shù)據(jù)量寫入時(shí)效率低下(HBase 會(huì) block 寫入,頻繁進(jìn)行 flush、split、compact 等大量 IO 操作),并對(duì) HBase 節(jié)點(diǎn)的穩(wěn)定性造成一定的影響(GC 時(shí)間過長(zhǎng),響應(yīng)變

    2024年02月10日
    瀏覽(21)
  • 分布式數(shù)據(jù)庫(kù)HBase

    分布式數(shù)據(jù)庫(kù)HBase

    HBase是一個(gè)高可靠、高性能、 面向列 、可伸縮的分布式數(shù)據(jù)庫(kù),是谷歌BigTable的開源實(shí)現(xiàn),主要用來存儲(chǔ)非結(jié)構(gòu)化和把結(jié)構(gòu)化的松散數(shù)據(jù)。 HBase的目標(biāo)是處理非常龐大的表,可以通過水平擴(kuò)展的方式,利用 廉價(jià)計(jì)算機(jī)集群 處理由超過10億行數(shù)據(jù)和數(shù)百萬列元素組成的數(shù)據(jù)表。

    2024年02月09日
    瀏覽(25)
  • Python讀取hbase數(shù)據(jù)庫(kù)

    Python讀取hbase數(shù)據(jù)庫(kù)

    1. hbase連接 首先用hbase shell 命令來進(jìn)入到hbase數(shù)據(jù)庫(kù),然后用list命令來查看hbase下所有表,以其中表“DB_level0”為例,可以看到庫(kù)名“baotouyiqi”是拼接的,python代碼訪問時(shí)先連接: 備注:完整代碼在最后,想運(yùn)行的直接滑倒最后復(fù)制即可 2. 按條件讀取hbase數(shù)據(jù) 然后按照條件

    2024年04月09日
    瀏覽(16)
  • 大數(shù)據(jù)NoSQL數(shù)據(jù)庫(kù)HBase集群部署

    大數(shù)據(jù)NoSQL數(shù)據(jù)庫(kù)HBase集群部署

    目錄 1.? 簡(jiǎn)介 2.? 安裝 1. HBase依賴Zookeeper、JDK、Hadoop(HDFS),請(qǐng)確保已經(jīng)完成前面 2. 【node1執(zhí)行】下載HBase安裝包 3. 【node1執(zhí)行】,修改配置文件,修改conf/hbase-env.sh文件 4. 【node1執(zhí)行】,修改配置文件,修改conf/hbase-site.xml文件 5. 【node1執(zhí)行】,修改配置文件,修改conf/regi

    2024年02月08日
    瀏覽(19)
  • 如何使用 PHP 進(jìn)行數(shù)據(jù)庫(kù)連接池優(yōu)化?

    連接池是一個(gè)存放數(shù)據(jù)庫(kù)連接的地方,就像一個(gè)水池,你在這里可以得到數(shù)據(jù)庫(kù)連接。這比每次都新建和關(guān)閉連接要快得多,因?yàn)檫B接池中的連接是可以重復(fù)使用的。 下面是一個(gè)簡(jiǎn)單的例子,展示如何使用PHP和PDO(PHP Data Objects)來創(chuàng)建一個(gè)連接池。 這個(gè)類有一個(gè)連接池,其

    2024年02月15日
    瀏覽(37)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包