一、前言
??通常 MapReduce
在寫 HBase
時(shí)使用的是 HTableOutputFormat
方式,在 reduce 中直接生成 put 對象寫入 HBase
,該方式在大數(shù)據(jù)量寫入時(shí)效率低下(HBase 會(huì) block 寫入,頻繁進(jìn)行 flush、split、compact 等大量 IO 操作),并對 HBase
節(jié)點(diǎn)的穩(wěn)定性造成一定的影響(GC 時(shí)間過長,響應(yīng)變慢,導(dǎo)致節(jié)點(diǎn)超時(shí)退出,并引起一系列連鎖反應(yīng)),而 HBase
支持 bulk load
的入庫方式,它是利用 hbase
的數(shù)據(jù)信息按照特定格式存儲(chǔ)在 hdfs
內(nèi)這一原理,直接在 HDFS
中生成持久化的 HFile
數(shù)據(jù)格式文件,然后上傳至合適位置,即完成巨量數(shù)據(jù)快速入庫的辦法。配合 Mapreduce
完成,高效便捷,而且不占用 region 資源,增添負(fù)載,在大數(shù)據(jù)量寫入時(shí)能極大的提高寫入效率,并降低對 HBase
節(jié)點(diǎn)的寫入壓力。
??通過使用先生成 HFile
,然后再 BulkLoad
到 Hbase
的方式來替代之前直接調(diào)用 HTableOutputFormat
的方法有如下的好處:
??(1)消除了對 HBase
集群的插入壓力
??(2)提高了 Job 的運(yùn)行速度,降低了 Job 的執(zhí)行時(shí)間
二、Bulkload 流程與實(shí)踐
1. 案例一:
??bulkload
方式需要兩個(gè) Job 配合完成:
??(1)第一個(gè) Job 還是運(yùn)行原來業(yè)務(wù)處理邏輯,處理的結(jié)果不直接調(diào)用 HTableOutputFormat
寫入到 HBase
,而是先寫入到 HDFS
上的一個(gè)中間目錄下(如 middata)
??(2)第二個(gè) Job 以第一個(gè) Job 的輸出(middata)做為輸入,然后將其格式化 HBase
的底層存儲(chǔ)文件 HFile
??(3)調(diào)用 BulkLoad
將第二個(gè) Job 生成的 HFile
導(dǎo)入到對應(yīng)的 HBase
表中
下面給出相應(yīng)的范例代碼:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class GeneratePutHFileAndBulkLoadToHBase {
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private Text wordText=new Text();
private IntWritable one=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line=value.toString();
String[] wordArray=line.split(" ");
for(String word:wordArray)
{
wordText.set(word);
context.write(wordText, one);
}
}
}
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result=new IntWritable();
protected void reduce(Text key, Iterable<IntWritable> valueList,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum=0;
for(IntWritable value:valueList)
{
sum+=value.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String wordCountStr=value.toString();
String[] wordCountArray=wordCountStr.split("\t");
String word=wordCountArray[0];
int count=Integer.valueOf(wordCountArray[1]);
//創(chuàng)建HBase中的RowKey
byte[] rowKey=Bytes.toBytes(word);
ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
byte[] family=Bytes.toBytes("cf");
byte[] qualifier=Bytes.toBytes("count");
byte[] hbaseValue=Bytes.toBytes(count);
// Put 用于列簇下的多列提交,若只有一個(gè)列,則可以使用 KeyValue 格式
// KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
Put put=new Put(rowKey);
put.add(family, qualifier, hbaseValue);
context.write(rowKeyWritable, put);
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration hadoopConfiguration=new Configuration();
String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();
//第一個(gè)Job就是普通MR,輸出到指定的目錄
Job job=new Job(hadoopConfiguration, "wordCountJob");
job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(dfsArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));
//提交第一個(gè)Job
int wordCountJobResult=job.waitForCompletion(true)?0:1;
//第二個(gè)Job以第一個(gè)Job的輸出做為輸入,只需要編寫Mapper類,在Mapper類中對一個(gè)job的輸出進(jìn)行分析,并轉(zhuǎn)換為HBase需要的KeyValue的方式。
Job convertWordCountJobOutputToHFileJob=new Job(hadoopConfiguration, "wordCount_bulkload");
convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);
//ReducerClass 無需指定,框架會(huì)自行根據(jù) MapOutputValueClass 來決定是使用 KeyValueSortReducer 還是 PutSortReducer
//convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);
convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class);
//以第一個(gè)Job的輸出做為第二個(gè)Job的輸入
FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[1]));
FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[2]));
//創(chuàng)建HBase的配置對象
Configuration hbaseConfiguration=HBaseConfiguration.create();
//創(chuàng)建目標(biāo)表對象
HTable wordCountTable =new HTable(hbaseConfiguration, "word_count");
HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);
//提交第二個(gè)job
int convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(true)?0:1;
//當(dāng)?shù)诙€(gè)job結(jié)束之后,調(diào)用BulkLoad方式來將MR結(jié)果批量入庫
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);
//第一個(gè)參數(shù)為第二個(gè)Job的輸出目錄即保存HFile的目錄,第二個(gè)參數(shù)為目標(biāo)表
loader.doBulkLoad(new Path(dfsArgs[2]), wordCountTable);
//最后調(diào)用System.exit進(jìn)行退出
System.exit(convertWordCountJobOutputToHFileJobResult);
}
}
??比如原始的輸入數(shù)據(jù)的目錄為:/rawdata/test/wordcount/20131212
??中間結(jié)果數(shù)據(jù)保存的目錄為:/middata/test/wordcount/20131212
??最終生成的 HFile 保存的目錄為:/resultdata/test/wordcount/20131212
??運(yùn)行上面的 Job 的方式如下:hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212
(1)說明與注意事項(xiàng)
??(1)HFile
方式在所有的加載方案里面是最快的,不過有個(gè)前提 —— 數(shù)據(jù)是第一次導(dǎo)入,表是空的。如果表中已經(jīng)有了數(shù)據(jù)。HFile
再導(dǎo)入到 Hbase
的表中會(huì)觸發(fā) split
操作。
??(2)最終輸出結(jié)果,無論是 map 還是 reduce,輸出部分 key 和 value 的類型必須是: <ImmutableBytesWritable, KeyValue>
或者 <ImmutableBytesWritable, Put>
。否則報(bào)這樣的錯(cuò)誤:
java.lang.IllegalArgumentException: Can't read partitions file
...
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
??(3)最終輸出部分,Value 類型是 KeyValue 或 Put,對應(yīng)的 Sorter 分別是 KeyValueSortReducer
或 PutSortReducer
,這個(gè) SorterReducer
可以不指定,因?yàn)樵创a中已經(jīng)做了判斷。
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
??(4) MR 例子中 job.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat 只適合一次對單列族組織成 HFile
文件,多列簇需要起多個(gè) job,不過新版本的 Hbase
(這句話是作者在四年前說的,現(xiàn)在最新的版本到達(dá)了什么程度我還沒有去細(xì)究)已經(jīng)解決了這個(gè)限制。
??(5) MR 例子中最后生成 HFile
存儲(chǔ)在 HDFS
上,輸出路徑下的子目錄是各個(gè)列族。如果對 HFile
進(jìn)行入庫 HBase
,相當(dāng)于移動(dòng) HFile
到 HBase
的 Region
中,HFile
子目錄的列族內(nèi)容沒有了。
??(6)最后一個(gè) Reduce
沒有 setNumReduceTasks
是因?yàn)?,該設(shè)置由框架根據(jù) region
個(gè)數(shù)自動(dòng)配置的。
??(7)下邊配置部分,注釋掉的其實(shí)寫不寫都無所謂,因?yàn)榭丛创a就知道 configureIncrementalLoad
方法已經(jīng)把固定的配置全配置完了,不固定的部分才需要手動(dòng)配置。
public class HFileOutput {
//job 配置
public static Job configureJob(Configuration conf) throws IOException {
Job job = new Job(configuration, "countUnite1");
job.setJarByClass(HFileOutput.class);
//job.setNumReduceTasks(2);
//job.setOutputKeyClass(ImmutableBytesWritable.class);
//job.setOutputValueClass(KeyValue.class);
//job.setOutputFormatClass(HFileOutputFormat.class);
Scan scan = new Scan();
scan.setCaching(10);
scan.addFamily(INPUT_FAMILY);
TableMapReduceUtil.initTableMapperJob(inputTable, scan,
HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
//這里如果不定義reducer部分,會(huì)自動(dòng)識(shí)別定義成KeyValueSortReducer.class 和PutSortReducer.class
job.setReducerClass(HFileOutputRedcuer.class);
//job.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.configureIncrementalLoad(job, new HTable(
configuration, outputTable));
HFileOutputFormat.setOutputPath(job, new Path());
//FileOutputFormat.setOutputPath(job, new Path()); //等同上句
return job;
}
public static class HFileOutputMapper extends
TableMapper<ImmutableBytesWritable, LongWritable> {
public void map(ImmutableBytesWritable key, Result values,
Context context) throws IOException, InterruptedException {
//mapper邏輯部分
context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
}
}
public static class HFileOutputRedcuer extends
Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
//reducer邏輯部分
KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
Bytes.toBytes(count));
context.write(key, kv);
}
}
}
上述內(nèi)容來自:HBase 寫優(yōu)化之 BulkLoad 實(shí)現(xiàn)數(shù)據(jù)快速入庫
(2)自我實(shí)踐
[hadoop@h71 ~]$ vi he.txt
hello world
hello hadoop
hello hive
[hadoop@h71 ~]$ hadoop fs -mkdir /rawdata
[hadoop@h71 ~]$ hadoop fs -put he.txt /rawdata
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac GeneratePutHFileAndBulkLoadToHBase.java
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar GeneratePutHFileAndBulkLoadToHBase*class
[hadoop@h71 hui]$ hadoop jar xx.jar GeneratePutHFileAndBulkLoadToHBase /rawdata /middata /resultdata
會(huì)報(bào)錯(cuò):
Exception in thread "main" java.lang.IllegalArgumentException: No regions passed
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.writePartitions(HFileOutputFormat2.java:315)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configurePartitioner(HFileOutputFormat2.java:573)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureIncrementalLoad(HFileOutputFormat2.java:421)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureIncrementalLoad(HFileOutputFormat2.java:386)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.configureIncrementalLoad(HFileOutputFormat.java:90)
at TestHFileToHBase.main(TestHFileToHBase.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
原來需要先建表然后再執(zhí)行之前的命令:
hbase(main):031:0> create 'word_count','cf'
[hadoop@h71 ~]$ hadoop fs -lsr /middata
-rw-r--r-- 2 hadoop supergroup 0 2017-03-20 10:36 /middata/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 32 2017-03-20 10:36 /middata/part-r-00000
[hadoop@h71 ~]$ hadoop fs -cat /middata/part-r-00000
hadoop 1
hello 3
hive 1
world 1
[hadoop@h71 ~]$ hadoop fs -lsr /resultdata
-rw-r--r-- 2 hadoop supergroup 0 2017-03-20 10:36 /resultdata/_SUCCESS
drwxr-xr-x - hadoop supergroup 0 2017-03-20 10:36 /resultdata/cf
# 這里的 cf 是空目錄,是因?yàn)?bulkload 會(huì)將指定目錄下的 Hfile 格式的文件移動(dòng)到 hbase 中,所以會(huì)是空目錄,當(dāng)用 mr 生成 HFile 文件是 cf 目錄下會(huì)有 Hfile 格式的文件存在,并且無法用 hadoop fs -cat 查看,如果非要用的話會(huì)是亂碼
hbase(main):012:0> scan 'word_count'
ROW COLUMN+CELL
hadoop column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01
hello column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x03
hive column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01
world column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01
# 發(fā)現(xiàn)插入的數(shù)據(jù)是字節(jié)類型的,后將代碼中的 put.add(family, qualifier, hbaseValue); 改為 put.add(family, qualifier, Bytes.toBytes("5"));)
# 再執(zhí)行上述指令得到:
hbase(main):032:0> scan 'word_count'
ROW COLUMN+CELL
hadoop column=cf:count, timestamp=1489977438537, value=5
hello column=cf:count, timestamp=1489977438537, value=5
hive column=cf:count, timestamp=1489977438537, value=5
world column=cf:count, timestamp=1489977438537, value=5
# 后來又將int count=Integer.valueOf(wordCountArray[1]);修改為String count=wordCountArray[1];
# 再執(zhí)行上述指令得到:
hbase(main):007:0> scan 'word_count'
ROW COLUMN+CELL
hadoop column=cf:count, timestamp=1489748145527, value=1
hello column=cf:count, timestamp=1489748145527, value=3
hive column=cf:count, timestamp=1489748145527, value=1
world column=cf:count, timestamp=1489748145527, value=1
注:我不明白原作者為什么非要整成 int 類型,這樣導(dǎo)入到 hbase 中就成 \x00\x00\x00\x01
了啊,后來搜索到一篇文章,可以看一下:【hbase】——bulk load導(dǎo)入數(shù)據(jù)時(shí)value=\x00\x00\x00\x01問題解析
??最終輸出部分,Value 類型是 KeyValue
或 Put
,對應(yīng)的 Sorter
分別是 KeyValueSortReducer
或 PutSortReducer
,這個(gè) SorterReducer
可以不指定,因?yàn)樵创a中已經(jīng)做了判斷:
??于是我想將 Put
改為 KeyValue
輸出為 HFile
:于是修改 ConvertWordCountOutToHFileMapper
類的代碼為:
public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>
{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String wordCountStr=value.toString();
String[] wordCountArray=wordCountStr.split("\t");
String word=wordCountArray[0];
String count=wordCountArray[1];
//創(chuàng)建HBase中的RowKey
byte[] rowKey=Bytes.toBytes(word);
ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
byte[] family=Bytes.toBytes("cf");
byte[] qualifier=Bytes.toBytes("count");
byte[] hbaseValue=Bytes.toBytes(count);
// Put 用于列簇下的多列提交,若只有一個(gè)列,則可以使用 KeyValue 格式
KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
// Put put=new Put(rowKey);
// put.add(family, qualifier, hbaseValue);
context.write(rowKeyWritable, keyValue);
}
}
??執(zhí)行上面命令后會(huì)報(bào)這個(gè)錯(cuò):
Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.hbase.client.Put, received org.apache.hadoop.hbase.KeyValue
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at GeneratePutHFileAndBulkLoadToHBase$ConvertWordCountOutToHFileMapper.map(GeneratePutHFileAndBulkLoadToHBase.java:89)
at GeneratePutHFileAndBulkLoadToHBase$ConvertWordCountOutToHFileMapper.map(GeneratePutHFileAndBulkLoadToHBase.java:67)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
??然后我將主方法中的 convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class);
改為 convertWordCountJobOutputToHFileJob.setMapOutputValueClass(KeyValue.class);
再執(zhí)行這才好使了。原來這些后面跟的這些 class 都不是瞎寫的啊,一開始我還以為是隨便寫的吶。。。
job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class); //代碼中的主類
job.setMapperClass(WordCountMapper.class); //第一個(gè)mr中的map類名
job.setReducerClass(WordCountReducer.class); //第一個(gè)mr中的reduce類名
job.setOutputKeyClass(Text.class); //我感覺這個(gè)是源碼中的類名,并不是轄寫的啊
job.setOutputValueClass(IntWritable.class); //同上
convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class); //代碼中的主類
convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class); //第二個(gè)mr中的map類名
convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class); //源碼中的類名
??對于 Hbase的ImmutableBytesWritable
類型,如果直接 Sysout
輸出的是一個(gè)類似于16進(jìn)制的 byte[];
??假設(shè)我們獲得了 ImmutableBytesWritable aa;
我們一般先將 aa 通過 byte[] bb = aa.get()
得到 byte[]
類型;然后通過 String cc = Bytes.toString(bb)
將其解析為 String;
2. 案例二:
(1)MR生成HFile文件
[hadoop@h71 hui]$ vi TestHFileToHBase.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestHFileToHBase {
public static class TestHFileToHBaseMapper extends Mapper {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(" ", 2);
byte[] row = Bytes.toBytes(values[0]);
ImmutableBytesWritable k = new ImmutableBytesWritable(row);
KeyValue kvProtocol = new KeyValue(row, "PROTOCOLID".getBytes(), "PROTOCOLID".getBytes(), values[1]
.getBytes());
context.write(k, kvProtocol);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "TestHFileToHBase");
job.setJarByClass(TestHFileToHBase.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setMapperClass(TestHFileToHBaseMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
// job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
// job.setNumReduceTasks(4);
// job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);
// HBaseAdmin admin = new HBaseAdmin(conf);
HTable table = new HTable(conf, "hua");
HFileOutputFormat.configureIncrementalLoad(job, table);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[hadoop@h71 ~]$ vi he.txt
hello world
hello hadoop
hello hive
[hadoop@h71 ~]$ hadoop fs -mkdir /rawdata
[hadoop@h71 ~]$ hadoop fs -put he.txt /rawdata
hbase(main):020:0> create 'hua','PROTOCOLID'
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac TestHFileToHBase.java
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar TestHFileToHBase*class
[hadoop@h71 hui]$ hadoop jar xx.jar TestHFileToHBase /rawdata /middata
報(bào)錯(cuò):
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
解決:
# 于是我把
public static class TestHFileToHBaseMapper extends Mapper {
# 修改為
public static class TestHFileToHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
# 就好使了。。。
[hadoop@h71 ~]$ hadoop fs -lsr /middata
drwxr-xr-x - hadoop supergroup 0 2017-03-17 20:50 /middata/PROTOCOLID
-rw-r--r-- 2 hadoop supergroup 1142 2017-03-17 20:50 /middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3
-rw-r--r-- 2 hadoop supergroup 0 2017-03-17 20:50 /middata/_SUCCESS
(/middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3是個(gè)Hfile格式的文件,無法用hadoop fs -cat查看,否則會(huì)出現(xiàn)亂碼)
(2)HFile入庫到HBase
??原文代碼有很多問題,修改后為:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;
public class TestLoadIncrementalHFileToHBase {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
HTable table = new HTable(conf,"hua");
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(new Path(dfsArgs[0]), table);
}
}
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac TestLoadIncrementalHFileToHBase.java
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata/PROTOCOLID
執(zhí)行后在hbase shell端查看表hua無數(shù)據(jù)。。因執(zhí)行:
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata
hbase(main):073:0> scan 'hua'
ROW COLUMN+CELL
hello column=PROTOCOLID:PROTOCOLID, timestamp=1489758507378, value=hive
(查看hua表h只有一條數(shù)據(jù),一開始還很困惑,我的he.txt中有三條數(shù)據(jù)啊,為何只導(dǎo)入了一條數(shù)據(jù)啊,后來突然明白了hbase將he.txt中三行數(shù)據(jù)的hello作為rowkey,則三行數(shù)據(jù)的rowkey都一樣了?。?
上述內(nèi)容來自:生成HFile以及入庫到HBase
3. 案例三:用 Scala 程序通過 Spark 完成
??為避免數(shù)據(jù)都寫入一個(gè) region,造成 Hbase
的數(shù)據(jù)傾斜問題。在當(dāng)前 HMaster
活躍的節(jié)點(diǎn)上,創(chuàng)建預(yù)分區(qū)表:
create ‘userprofile_labels', { NAME => "f", BLOCKCACHE => "true", BLOOMFILTER => "ROWCOL", COMPRESSION => 'snappy', IN_MEMORY => 'true' }, { NUMREGIONS => 10, SPLITALGO => 'HexStringSplit' }
??將待同步的數(shù)據(jù)寫入 HFile,HFile 中的數(shù)據(jù)以 key-value 鍵值對方式存儲(chǔ),然后將 HFile 數(shù)據(jù)使用 BulkLoad 批量寫入 HBase 集群中。 Scala 腳本執(zhí)行如下:
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.Hbase.client.ConnectionFactory
import org.apache.hadoop.Hbase.{HbaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.Hbase.io.ImmutableBytesWritable
import org.apache.hadoop.Hbase.mapreduce.{HFileOutFormat2, LoadIncremectalHFiles}
import org.apache.hadoop.Hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.spark.sql.SparkSession
object Hive2HBase {
def main(args: Array[String]): Unit = {
// 傳入日期參數(shù) 和 當(dāng)前活躍的master節(jié)點(diǎn)
val data_date = arg(0)
val node = args(1) //當(dāng)前活躍的節(jié)點(diǎn)ip
val spark = SparkSession
.builder()
.appName("Hive2Hbase")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.config("spark.storage.memoryFraction", "0.1")
.config("spark.shuffle.memoryFraction", "0.7")
.config("spark.memory.useLegacyMode", "ture")
.enableHiveSupport()
.getOrCreate()
// 創(chuàng)建HBase的配置
val conf = HBaseConfiguration.create()
conf.set("HBase.zookeeper.quorum", "10.xxx.xxx.xxx, 10.xxx.xxx.xxx")
conf.set("HBase.zookeeper.property.clientPort", "8020")
//為了預(yù)防hfile文件數(shù)過多無法進(jìn)行導(dǎo)入,設(shè)置參數(shù)值
conf.setInt("HBase.hregion.max.filesize", 10737418240)
conf.setInt("HBase.mapreduce.bulkload.max.hfiles.perRegion.perRegion.perFamily", 3200)
val Data = spark.sql(s"select userid,userlabels from dw.userprofile_usergroup_labels_all where data_date='${data_date}‘”)
val dataRdd = Data.rdd.flatMap(row =>
val rowkey = row.getAs[String]("userid".toLowerCase)
val tagsmap = row.getAs[Map[String, Object]]("userlabels".toLowerCase)
val sbkey = new StringBuffer() // 對MAP結(jié)構(gòu)轉(zhuǎn)化 a->b 'a':'b'
val sbvalue = new StringBuffer()
for ((key, value) <- tagsmap) {
sbkey.append(key + ":")
val labelght = if (value == "") {
"-999999"
} else {
value
}
sbvalue.append(labelght + ":")
)
val item = sbkey.substring(0, sbkey.length-1)
val score = sbvalue.substring(0, sbvalue.length-1)
Array(
(rowkey,("f","i",item)),
(rowkey,("f","s",score))
)
})
// 將rdd轉(zhuǎn)換成HFile需要的格式
val rdds = dataRdd.fileter(x=>x._1 != null).sortBy(x=>(x._1,x._2._1,x._2._2)).map(x => {
//KeyValue的實(shí)例為value
val rowKey = Bytes.toBytes(x._1)
val family = Bytes.toBytes(x._2._1)
val colum = Bytes.toBytes(x._2._2)
val value = Bytes.toBytes(x._2._3.toString)
(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
))
// 文件保存在hdfs的位置
val locatedir = "hdfs://" + node.toString + ":8020/user/bulkload/hfile/usergroup_HBase_" + data_date
// 在locatedir生成的Hfile文件
rdds.saveAsNewAPIHadoopFile(locatedir,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf)
//HFile導(dǎo)入到HBase
val load = new LoadIncrementalHFiles(conf)
//HBase的表名
val tableName = "userprofile_labels"
//創(chuàng)建HBase的鏈接,利用默認(rèn)的配置文件,讀取HBase的master地址
val conn = ConnectionFactory.createConnection(conf)
//根據(jù)表名獲取表
val table = conn.getTable(TableName.valueOf(tableName))
try {
//獲取HBase表的region分布
val regionLocation = conn.getregionLocation(TableName.valueOf(tableName))
//創(chuàng)建一個(gè)hadoop的mapreduce的job
val job = Job.getInstance(conf)
//設(shè)置job名稱,任意命名
job.setJobName("Hive2HBase")
//輸出文件的內(nèi)容KeyValue
job.setMapOutputKeyClass(classOf[KeyValue])
//設(shè)置文件輸出key,outkey要用ImmutableBytesWritable
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
//配置HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocation)
//開始導(dǎo)入
load.doBulkLoad(new Path(locatedir), conn.getAdmin, table, regionLocation)
} finally {
table.close()
conn.close()
)
spark.close()
}
}
上述內(nèi)容來自書籍:《用戶畫像方法論與工程化解決方案》
三、補(bǔ)充:Region分裂(Split)
參考:hbase的split策略和預(yù)分區(qū) 和 【原創(chuàng)】HBase的分裂(Split)與緊縮(Compaction)
??Region分裂是HBase最核心的功能之一,是實(shí)現(xiàn)分布式可擴(kuò)展性的基礎(chǔ)。最初,每個(gè)Table只有一個(gè)Region,隨著數(shù)據(jù)的不斷寫入,HBase根據(jù)一定的觸發(fā)條件和一定的分裂策略將Hbase的一個(gè)region分裂成兩個(gè)子region并對父region進(jìn)行清除,通過HBase的balance機(jī)制,實(shí)現(xiàn)分裂后的region負(fù)載均衡到對應(yīng)RegionServer上。若一個(gè)table沒有進(jìn)行預(yù)分區(qū),那么只有一個(gè)region,初始化表時(shí)數(shù)據(jù)的讀寫都命中同一個(gè)regionServer,會(huì)造成熱點(diǎn)問題,且region進(jìn)行split時(shí)集群是不可用的,頻繁的split也會(huì)造成大量的集群I/O,性能很低。目前常見的HBase分裂方式有三種:
- Per-Spliting(預(yù)分區(qū))
- Auto-Spliting(自動(dòng)分裂)
- Force-Spliting(強(qiáng)制分裂)
??Per-Spliting指的是在HBase創(chuàng)建Table時(shí),指定好Table的Region的個(gè)數(shù),生成多個(gè)Region。這么做的好處是一方面可以避免熱點(diǎn)數(shù)據(jù)的寫入問題(只有一個(gè)region,寫數(shù)據(jù)量大時(shí)其余RegionServer就是空閑的),另一方面減少Region的Split幾率,同時(shí)減少消耗集群的系統(tǒng)資源(IO,網(wǎng)絡(luò)),減少因Split暫停Region的上線造成對HBase讀寫數(shù)據(jù)的影響。
??HBase默認(rèn)建表時(shí)只有一個(gè)Region,此時(shí)的RowKey是沒有邊界的,即沒有StartKey和EndKey。進(jìn)行預(yù)分區(qū)時(shí),需要配置切分點(diǎn),使得HBase知道在哪個(gè)RowKey點(diǎn)做切分。hbase提供了兩種pre-split算法:HexStringSplit和UniformSplit,前者適用于十六進(jìn)制字符的rowkey,后者適用于隨機(jī)字節(jié)數(shù)組的rowkey。文章來源:http://www.zghlxwxcb.cn/news/detail-496727.html
//創(chuàng)建一個(gè)名為hex_test的表,有兩個(gè)列簇info和desc,可存3個(gè)版本的數(shù)據(jù),副本為2,預(yù)先指定10個(gè)region,且split算法為HexStringSplit
create 'hex_test',{NAME=>'info',VERSIONS=>3},{NAME=>'desc',VERSIONS=>3},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit',REGION_REPLICATION=>2}
//UniformSplit
create 'uniform_test',{NAME=>'info',VERSIONS=>3},{NAME=>'desc',VERSIONS=>3},{NUMREGIONS=>10,SPLITALGO=>'UniformSplit',REGION_REPLICATION=>2}
??自動(dòng)分裂指的是隨著不斷向表中寫入數(shù)據(jù),Region也會(huì)不斷增大,HBase根據(jù)觸發(fā)的分裂策略自動(dòng)分裂Region,當(dāng)前HBase已經(jīng)有6中分裂觸發(fā)的策略,不同版本中配置的分裂策略不同。文章來源地址http://www.zghlxwxcb.cn/news/detail-496727.html
// 強(qiáng)制分裂
split 'forced_table', 'b' //其中forced_table 為要split的table , ‘b’ 為split 點(diǎn)
// 我們還可以通過配置hbase.regionserver.region.split.policy指定自己的split策略
<!--定時(shí)切分執(zhí)行類-->
<property>
<name>hbase.regionserver.region.split.policy</name>
<value>org.apache.hadoop.hbase.regionserver.TimingRegionSplitPolicy</value>
</property>
<!--定時(shí)切分時(shí)間-->
<property>
<name>hbase.regionserver.region.split.startTime</name>
<value>02:00:00</value>
</property>
<property>
<name>hbase.regionserver.region.split.endTime</name>
<value>04:00:00</value>
</property>
到了這里,關(guān)于Hbase的bulkload流程與實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!