国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hbase的bulkload流程與實(shí)踐

這篇具有很好參考價(jià)值的文章主要介紹了Hbase的bulkload流程與實(shí)踐。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、前言

??通常 MapReduce 在寫 HBase 時(shí)使用的是 HTableOutputFormat 方式,在 reduce 中直接生成 put 對象寫入 HBase,該方式在大數(shù)據(jù)量寫入時(shí)效率低下(HBase 會(huì) block 寫入,頻繁進(jìn)行 flush、split、compact 等大量 IO 操作),并對 HBase 節(jié)點(diǎn)的穩(wěn)定性造成一定的影響(GC 時(shí)間過長,響應(yīng)變慢,導(dǎo)致節(jié)點(diǎn)超時(shí)退出,并引起一系列連鎖反應(yīng)),而 HBase 支持 bulk load 的入庫方式,它是利用 hbase 的數(shù)據(jù)信息按照特定格式存儲(chǔ)在 hdfs 內(nèi)這一原理,直接在 HDFS 中生成持久化的 HFile 數(shù)據(jù)格式文件,然后上傳至合適位置,即完成巨量數(shù)據(jù)快速入庫的辦法。配合 Mapreduce 完成,高效便捷,而且不占用 region 資源,增添負(fù)載,在大數(shù)據(jù)量寫入時(shí)能極大的提高寫入效率,并降低對 HBase 節(jié)點(diǎn)的寫入壓力。

??通過使用先生成 HFile,然后再 BulkLoadHbase 的方式來替代之前直接調(diào)用 HTableOutputFormat 的方法有如下的好處:
??(1)消除了對 HBase 集群的插入壓力
??(2)提高了 Job 的運(yùn)行速度,降低了 Job 的執(zhí)行時(shí)間

二、Bulkload 流程與實(shí)踐

1. 案例一:

??bulkload 方式需要兩個(gè) Job 配合完成:
??(1)第一個(gè) Job 還是運(yùn)行原來業(yè)務(wù)處理邏輯,處理的結(jié)果不直接調(diào)用 HTableOutputFormat 寫入到 HBase,而是先寫入到 HDFS 上的一個(gè)中間目錄下(如 middata)
??(2)第二個(gè) Job 以第一個(gè) Job 的輸出(middata)做為輸入,然后將其格式化 HBase 的底層存儲(chǔ)文件 HFile
??(3)調(diào)用 BulkLoad 將第二個(gè) Job 生成的 HFile 導(dǎo)入到對應(yīng)的 HBase 表中
下面給出相應(yīng)的范例代碼:

import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class GeneratePutHFileAndBulkLoadToHBase {
 
	public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
	{
 
		private Text wordText=new Text();
		private IntWritable one=new IntWritable(1);
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String line=value.toString();
			String[] wordArray=line.split(" ");
			for(String word:wordArray)
			{
				wordText.set(word);
				context.write(wordText, one);
			}
			
		}
	}
	
	public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
 
		private IntWritable result=new IntWritable();
		protected void reduce(Text key, Iterable<IntWritable> valueList,
				Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			int sum=0;
			for(IntWritable value:valueList)
			{
				sum+=value.get();
			}
			result.set(sum);
			context.write(key, result);
		}
		
	}
	
	public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
	{
 
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String wordCountStr=value.toString();
			String[] wordCountArray=wordCountStr.split("\t");
			String word=wordCountArray[0];
			int count=Integer.valueOf(wordCountArray[1]);
			
			//創(chuàng)建HBase中的RowKey
			byte[] rowKey=Bytes.toBytes(word);
			ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
			byte[] family=Bytes.toBytes("cf");
			byte[] qualifier=Bytes.toBytes("count");
			byte[] hbaseValue=Bytes.toBytes(count);
			// Put 用于列簇下的多列提交,若只有一個(gè)列,則可以使用 KeyValue 格式
			// KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
			Put put=new Put(rowKey);
			put.add(family, qualifier, hbaseValue);
			context.write(rowKeyWritable, put);
			
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
        Configuration hadoopConfiguration=new Configuration();
        String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();
		
        //第一個(gè)Job就是普通MR,輸出到指定的目錄
        Job job=new Job(hadoopConfiguration, "wordCountJob");
        job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path(dfsArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));
        //提交第一個(gè)Job
        int wordCountJobResult=job.waitForCompletion(true)?0:1;
        
        //第二個(gè)Job以第一個(gè)Job的輸出做為輸入,只需要編寫Mapper類,在Mapper類中對一個(gè)job的輸出進(jìn)行分析,并轉(zhuǎn)換為HBase需要的KeyValue的方式。
        Job convertWordCountJobOutputToHFileJob=new Job(hadoopConfiguration, "wordCount_bulkload");
        
        convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);
        convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);
		//ReducerClass 無需指定,框架會(huì)自行根據(jù) MapOutputValueClass 來決定是使用 KeyValueSortReducer 還是 PutSortReducer
		//convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);
        convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
        convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class);
        
        //以第一個(gè)Job的輸出做為第二個(gè)Job的輸入
        FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[1]));
        FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[2]));
        //創(chuàng)建HBase的配置對象
        Configuration hbaseConfiguration=HBaseConfiguration.create();
        //創(chuàng)建目標(biāo)表對象
        HTable wordCountTable =new HTable(hbaseConfiguration, "word_count");
        HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);
       
        //提交第二個(gè)job
        int convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(true)?0:1;
        
        //當(dāng)?shù)诙€(gè)job結(jié)束之后,調(diào)用BulkLoad方式來將MR結(jié)果批量入庫
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);
        //第一個(gè)參數(shù)為第二個(gè)Job的輸出目錄即保存HFile的目錄,第二個(gè)參數(shù)為目標(biāo)表
        loader.doBulkLoad(new Path(dfsArgs[2]), wordCountTable);
        
        //最后調(diào)用System.exit進(jìn)行退出
        System.exit(convertWordCountJobOutputToHFileJobResult);
		
	}
 
}

??比如原始的輸入數(shù)據(jù)的目錄為:/rawdata/test/wordcount/20131212
??中間結(jié)果數(shù)據(jù)保存的目錄為:/middata/test/wordcount/20131212
??最終生成的 HFile 保存的目錄為:/resultdata/test/wordcount/20131212
??運(yùn)行上面的 Job 的方式如下:hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212

(1)說明與注意事項(xiàng)

??(1)HFile 方式在所有的加載方案里面是最快的,不過有個(gè)前提 —— 數(shù)據(jù)是第一次導(dǎo)入,表是空的。如果表中已經(jīng)有了數(shù)據(jù)。HFile 再導(dǎo)入到 Hbase 的表中會(huì)觸發(fā) split 操作。

??(2)最終輸出結(jié)果,無論是 map 還是 reduce,輸出部分 key 和 value 的類型必須是: <ImmutableBytesWritable, KeyValue> 或者 <ImmutableBytesWritable, Put>。否則報(bào)這樣的錯(cuò)誤:

java.lang.IllegalArgumentException: Can't read partitions file
...
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

??(3)最終輸出部分,Value 類型是 KeyValue 或 Put,對應(yīng)的 Sorter 分別是 KeyValueSortReducerPutSortReducer,這個(gè) SorterReducer 可以不指定,因?yàn)樵创a中已經(jīng)做了判斷。

if (KeyValue.class.equals(job.getMapOutputValueClass())) {
	job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
	job.setReducerClass(PutSortReducer.class);
} else {
	LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

??(4) MR 例子中 job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat 只適合一次對單列族組織成 HFile 文件,多列簇需要起多個(gè) job,不過新版本的 Hbase(這句話是作者在四年前說的,現(xiàn)在最新的版本到達(dá)了什么程度我還沒有去細(xì)究)已經(jīng)解決了這個(gè)限制。

??(5) MR 例子中最后生成 HFile 存儲(chǔ)在 HDFS 上,輸出路徑下的子目錄是各個(gè)列族。如果對 HFile 進(jìn)行入庫 HBase,相當(dāng)于移動(dòng) HFileHBaseRegion 中,HFile 子目錄的列族內(nèi)容沒有了。

??(6)最后一個(gè) Reduce 沒有 setNumReduceTasks 是因?yàn)?,該設(shè)置由框架根據(jù) region 個(gè)數(shù)自動(dòng)配置的。

??(7)下邊配置部分,注釋掉的其實(shí)寫不寫都無所謂,因?yàn)榭丛创a就知道 configureIncrementalLoad 方法已經(jīng)把固定的配置全配置完了,不固定的部分才需要手動(dòng)配置。

public class HFileOutput {
        //job 配置
	public static Job configureJob(Configuration conf) throws IOException {
		Job job = new Job(configuration, "countUnite1");
		job.setJarByClass(HFileOutput.class);
                //job.setNumReduceTasks(2);  
		//job.setOutputKeyClass(ImmutableBytesWritable.class);
		//job.setOutputValueClass(KeyValue.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
 
		Scan scan = new Scan();
		scan.setCaching(10);
		scan.addFamily(INPUT_FAMILY);
		TableMapReduceUtil.initTableMapperJob(inputTable, scan,
				HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);
		//這里如果不定義reducer部分,會(huì)自動(dòng)識(shí)別定義成KeyValueSortReducer.class 和PutSortReducer.class
                job.setReducerClass(HFileOutputRedcuer.class);
		//job.setOutputFormatClass(HFileOutputFormat.class);
		HFileOutputFormat.configureIncrementalLoad(job, new HTable(
				configuration, outputTable));
		HFileOutputFormat.setOutputPath(job, new Path());
                //FileOutputFormat.setOutputPath(job, new Path()); //等同上句
		return job;
	}
 
	public static class HFileOutputMapper extends
			TableMapper<ImmutableBytesWritable, LongWritable> {
		public void map(ImmutableBytesWritable key, Result values,
				Context context) throws IOException, InterruptedException {
			//mapper邏輯部分
			context.write(new ImmutableBytesWritable(Bytes()), LongWritable());
		}
	}
 
	public static class HFileOutputRedcuer extends
			Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
		public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
				Context context) throws IOException, InterruptedException {
                        //reducer邏輯部分
			KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),
					Bytes.toBytes(count));
			context.write(key, kv);
		}
	}
}

上述內(nèi)容來自:HBase 寫優(yōu)化之 BulkLoad 實(shí)現(xiàn)數(shù)據(jù)快速入庫

(2)自我實(shí)踐
[hadoop@h71 ~]$ vi he.txt
hello world
hello hadoop
hello hive
[hadoop@h71 ~]$ hadoop fs -mkdir /rawdata
[hadoop@h71 ~]$ hadoop fs -put he.txt /rawdata
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac GeneratePutHFileAndBulkLoadToHBase.java
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar GeneratePutHFileAndBulkLoadToHBase*class
[hadoop@h71 hui]$ hadoop jar xx.jar GeneratePutHFileAndBulkLoadToHBase /rawdata /middata /resultdata

會(huì)報(bào)錯(cuò):

Exception in thread "main" java.lang.IllegalArgumentException: No regions passed
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.writePartitions(HFileOutputFormat2.java:315)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configurePartitioner(HFileOutputFormat2.java:573)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureIncrementalLoad(HFileOutputFormat2.java:421)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureIncrementalLoad(HFileOutputFormat2.java:386)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.configureIncrementalLoad(HFileOutputFormat.java:90)
        at TestHFileToHBase.main(TestHFileToHBase.java:57)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

原來需要先建表然后再執(zhí)行之前的命令:

hbase(main):031:0> create 'word_count','cf'
[hadoop@h71 ~]$ hadoop fs -lsr /middata
-rw-r--r--   2 hadoop supergroup          0 2017-03-20 10:36 /middata/_SUCCESS
-rw-r--r--   2 hadoop supergroup         32 2017-03-20 10:36 /middata/part-r-00000
[hadoop@h71 ~]$ hadoop fs -cat /middata/part-r-00000
hadoop  1
hello   3
hive    1
world   1
[hadoop@h71 ~]$ hadoop fs -lsr /resultdata
-rw-r--r--   2 hadoop supergroup          0 2017-03-20 10:36 /resultdata/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2017-03-20 10:36 /resultdata/cf
# 這里的 cf 是空目錄,是因?yàn)?bulkload 會(huì)將指定目錄下的 Hfile 格式的文件移動(dòng)到 hbase 中,所以會(huì)是空目錄,當(dāng)用 mr 生成 HFile 文件是 cf 目錄下會(huì)有 Hfile 格式的文件存在,并且無法用 hadoop fs -cat 查看,如果非要用的話會(huì)是亂碼
hbase(main):012:0> scan 'word_count'
ROW                                         COLUMN+CELL                                                                                                                 
 hadoop                                     column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01                                                            
 hello                                      column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x03                                                            
 hive                                       column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01                                                            
 world                                      column=cf:count, timestamp=1489973703632, value=\x00\x00\x00\x01
# 發(fā)現(xiàn)插入的數(shù)據(jù)是字節(jié)類型的,后將代碼中的 put.add(family, qualifier, hbaseValue); 改為 put.add(family, qualifier, Bytes.toBytes("5"));)

# 再執(zhí)行上述指令得到:
hbase(main):032:0> scan 'word_count'
ROW                                         COLUMN+CELL                                                                                                                 
 hadoop                                     column=cf:count, timestamp=1489977438537, value=5                                                                           
 hello                                      column=cf:count, timestamp=1489977438537, value=5                                                                           
 hive                                       column=cf:count, timestamp=1489977438537, value=5                                                                           
 world                                      column=cf:count, timestamp=1489977438537, value=5
 
# 后來又將int count=Integer.valueOf(wordCountArray[1]);修改為String count=wordCountArray[1];
# 再執(zhí)行上述指令得到:
hbase(main):007:0> scan 'word_count'
ROW                                         COLUMN+CELL                                                                                                                 
 hadoop                                     column=cf:count, timestamp=1489748145527, value=1                                                                           
 hello                                      column=cf:count, timestamp=1489748145527, value=3                                                                           
 hive                                       column=cf:count, timestamp=1489748145527, value=1                                                                           
 world                                      column=cf:count, timestamp=1489748145527, value=1 

注:我不明白原作者為什么非要整成 int 類型,這樣導(dǎo)入到 hbase 中就成 \x00\x00\x00\x01 了啊,后來搜索到一篇文章,可以看一下:【hbase】——bulk load導(dǎo)入數(shù)據(jù)時(shí)value=\x00\x00\x00\x01問題解析

??最終輸出部分,Value 類型是 KeyValuePut,對應(yīng)的 Sorter 分別是 KeyValueSortReducerPutSortReducer,這個(gè) SorterReducer 可以不指定,因?yàn)樵创a中已經(jīng)做了判斷:

??于是我想將 Put 改為 KeyValue 輸出為 HFile:于是修改 ConvertWordCountOutToHFileMapper 類的代碼為:

	public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>
	{
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			String wordCountStr=value.toString();
			String[] wordCountArray=wordCountStr.split("\t");
			String word=wordCountArray[0];
			String count=wordCountArray[1];
			
			//創(chuàng)建HBase中的RowKey
			byte[] rowKey=Bytes.toBytes(word);
			ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);
			byte[] family=Bytes.toBytes("cf");
			byte[] qualifier=Bytes.toBytes("count");
			byte[] hbaseValue=Bytes.toBytes(count);
			// Put 用于列簇下的多列提交,若只有一個(gè)列,則可以使用 KeyValue 格式
			 KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
//			Put put=new Put(rowKey);
//			put.add(family, qualifier, hbaseValue);
			context.write(rowKeyWritable, keyValue);
		}
	}

??執(zhí)行上面命令后會(huì)報(bào)這個(gè)錯(cuò):

Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.hbase.client.Put, received org.apache.hadoop.hbase.KeyValue
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1078)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at GeneratePutHFileAndBulkLoadToHBase$ConvertWordCountOutToHFileMapper.map(GeneratePutHFileAndBulkLoadToHBase.java:89)
        at GeneratePutHFileAndBulkLoadToHBase$ConvertWordCountOutToHFileMapper.map(GeneratePutHFileAndBulkLoadToHBase.java:67)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

??然后我將主方法中的 convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class); 改為 convertWordCountJobOutputToHFileJob.setMapOutputValueClass(KeyValue.class); 再執(zhí)行這才好使了。原來這些后面跟的這些 class 都不是瞎寫的啊,一開始我還以為是隨便寫的吶。。。

job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);    //代碼中的主類
job.setMapperClass(WordCountMapper.class);  //第一個(gè)mr中的map類名
job.setReducerClass(WordCountReducer.class);   //第一個(gè)mr中的reduce類名
job.setOutputKeyClass(Text.class);  //我感覺這個(gè)是源碼中的類名,并不是轄寫的啊
job.setOutputValueClass(IntWritable.class);   //同上
convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);  //代碼中的主類
convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);  //第二個(gè)mr中的map類名
convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);  //源碼中的類名

??對于 Hbase的ImmutableBytesWritable 類型,如果直接 Sysout 輸出的是一個(gè)類似于16進(jìn)制的 byte[];

??假設(shè)我們獲得了 ImmutableBytesWritable aa; 我們一般先將 aa 通過 byte[] bb = aa.get() 得到 byte[] 類型;然后通過 String cc = Bytes.toString(bb) 將其解析為 String;

2. 案例二:
(1)MR生成HFile文件
[hadoop@h71 hui]$ vi TestHFileToHBase.java
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestHFileToHBase {

        public static class TestHFileToHBaseMapper extends Mapper {

                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                        String[] values = value.toString().split(" ", 2);
                        byte[] row = Bytes.toBytes(values[0]);
                        ImmutableBytesWritable k = new ImmutableBytesWritable(row);
                        KeyValue kvProtocol = new KeyValue(row, "PROTOCOLID".getBytes(), "PROTOCOLID".getBytes(), values[1]
                                        .getBytes());
                        context.write(k, kvProtocol);
                }
        }

        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
                Configuration conf = HBaseConfiguration.create();
                Job job = new Job(conf, "TestHFileToHBase");
                job.setJarByClass(TestHFileToHBase.class);

                job.setOutputKeyClass(ImmutableBytesWritable.class);
                job.setOutputValueClass(KeyValue.class);

                job.setMapperClass(TestHFileToHBaseMapper.class);
                job.setReducerClass(KeyValueSortReducer.class);
//                job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
                job.setOutputFormatClass(HFileOutputFormat.class);
                // job.setNumReduceTasks(4);
                // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);

//                 HBaseAdmin admin = new HBaseAdmin(conf);
                HTable table = new HTable(conf, "hua");

                 HFileOutputFormat.configureIncrementalLoad(job, table);

                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

}
[hadoop@h71 ~]$ vi he.txt
hello world
hello hadoop
hello hive
[hadoop@h71 ~]$ hadoop fs -mkdir /rawdata
[hadoop@h71 ~]$ hadoop fs -put he.txt /rawdata
hbase(main):020:0> create 'hua','PROTOCOLID'
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac TestHFileToHBase.java 
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar TestHFileToHBase*class
[hadoop@h71 hui]$ hadoop jar xx.jar TestHFileToHBase /rawdata /middata

報(bào)錯(cuò):

Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, received org.apache.hadoop.io.LongWritable
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
        at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
        at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
        at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

解決:

# 于是我把
public static class TestHFileToHBaseMapper extends Mapper {
# 修改為
public static class TestHFileToHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
# 就好使了。。。
[hadoop@h71 ~]$ hadoop fs -lsr /middata
drwxr-xr-x   - hadoop supergroup          0 2017-03-17 20:50 /middata/PROTOCOLID
-rw-r--r--   2 hadoop supergroup       1142 2017-03-17 20:50 /middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3
-rw-r--r--   2 hadoop supergroup          0 2017-03-17 20:50 /middata/_SUCCESS
(/middata/PROTOCOLID/65493afaefac43528c554d0b8056f1e3是個(gè)Hfile格式的文件,無法用hadoop fs -cat查看,否則會(huì)出現(xiàn)亂碼)
(2)HFile入庫到HBase

??原文代碼有很多問題,修改后為:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestLoadIncrementalHFileToHBase {

        public static void main(String[] args) throws Exception {
                Configuration conf = HBaseConfiguration.create();
                String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                HTable table = new HTable(conf,"hua");
                LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                loader.doBulkLoad(new Path(dfsArgs[0]), table);
        }
}
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/javac TestLoadIncrementalHFileToHBase.java
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata/PROTOCOLID
執(zhí)行后在hbase shell端查看表hua無數(shù)據(jù)。。因執(zhí)行:
[hadoop@h71 hui]$ /usr/jdk1.7.0_25/bin/java TestLoadIncrementalHFileToHBase /middata
hbase(main):073:0> scan 'hua'
ROW                                         COLUMN+CELL                                                                                                                 
 hello                                      column=PROTOCOLID:PROTOCOLID, timestamp=1489758507378, value=hive
(查看hua表h只有一條數(shù)據(jù),一開始還很困惑,我的he.txt中有三條數(shù)據(jù)啊,為何只導(dǎo)入了一條數(shù)據(jù)啊,后來突然明白了hbase將he.txt中三行數(shù)據(jù)的hello作為rowkey,則三行數(shù)據(jù)的rowkey都一樣了?。?

上述內(nèi)容來自:生成HFile以及入庫到HBase

3. 案例三:用 Scala 程序通過 Spark 完成

??為避免數(shù)據(jù)都寫入一個(gè) region,造成 Hbase 的數(shù)據(jù)傾斜問題。在當(dāng)前 HMaster 活躍的節(jié)點(diǎn)上,創(chuàng)建預(yù)分區(qū)表:

create ‘userprofile_labels', { NAME => "f", BLOCKCACHE => "true", BLOOMFILTER => "ROWCOL", COMPRESSION => 'snappy', IN_MEMORY => 'true' }, { NUMREGIONS => 10, SPLITALGO => 'HexStringSplit' }

??將待同步的數(shù)據(jù)寫入 HFile,HFile 中的數(shù)據(jù)以 key-value 鍵值對方式存儲(chǔ),然后將 HFile 數(shù)據(jù)使用 BulkLoad 批量寫入 HBase 集群中。 Scala 腳本執(zhí)行如下:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.Hbase.client.ConnectionFactory
import org.apache.hadoop.Hbase.{HbaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.Hbase.io.ImmutableBytesWritable
import org.apache.hadoop.Hbase.mapreduce.{HFileOutFormat2, LoadIncremectalHFiles}
import org.apache.hadoop.Hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.spark.sql.SparkSession

object Hive2HBase {
  def main(args: Array[String]): Unit = {
    // 傳入日期參數(shù) 和 當(dāng)前活躍的master節(jié)點(diǎn)
    val data_date = arg(0)
    val node = args(1)  //當(dāng)前活躍的節(jié)點(diǎn)ip

    val spark = SparkSession
      .builder()
      .appName("Hive2Hbase")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .config("spark.storage.memoryFraction", "0.1")
      .config("spark.shuffle.memoryFraction", "0.7")
      .config("spark.memory.useLegacyMode", "ture")
      .enableHiveSupport()
      .getOrCreate()

    // 創(chuàng)建HBase的配置
    val conf = HBaseConfiguration.create()
        conf.set("HBase.zookeeper.quorum", "10.xxx.xxx.xxx, 10.xxx.xxx.xxx")
        conf.set("HBase.zookeeper.property.clientPort", "8020")

    //為了預(yù)防hfile文件數(shù)過多無法進(jìn)行導(dǎo)入,設(shè)置參數(shù)值
    conf.setInt("HBase.hregion.max.filesize", 10737418240)
    conf.setInt("HBase.mapreduce.bulkload.max.hfiles.perRegion.perRegion.perFamily", 3200)

    val Data = spark.sql(s"select userid,userlabels from dw.userprofile_usergroup_labels_all where data_date='${data_date}‘”)
    val dataRdd = Data.rdd.flatMap(row => 
      val rowkey =  row.getAs[String]("userid".toLowerCase)
      val tagsmap = row.getAs[Map[String, Object]]("userlabels".toLowerCase)
      val sbkey = new StringBuffer()  // 對MAP結(jié)構(gòu)轉(zhuǎn)化 a->b 'a':'b'
      val sbvalue = new StringBuffer()
      for ((key, value) <- tagsmap) {
        sbkey.append(key + ":")
        val labelght = if (value == "") {
          "-999999"
        } else {
          value
        }
        sbvalue.append(labelght + ":")
      )
      val item = sbkey.substring(0, sbkey.length-1)
      val score = sbvalue.substring(0, sbvalue.length-1)
      Array(
        (rowkey,("f","i",item)),
        (rowkey,("f","s",score))
      )
    })
    
    // 將rdd轉(zhuǎn)換成HFile需要的格式
    val rdds = dataRdd.fileter(x=>x._1 != null).sortBy(x=>(x._1,x._2._1,x._2._2)).map(x => {
      //KeyValue的實(shí)例為value
      val rowKey = Bytes.toBytes(x._1)
      val family = Bytes.toBytes(x._2._1)
      val colum = Bytes.toBytes(x._2._2)
      val value = Bytes.toBytes(x._2._3.toString)
      (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
    ))

    // 文件保存在hdfs的位置
    val locatedir = "hdfs://" + node.toString + ":8020/user/bulkload/hfile/usergroup_HBase_" + data_date

    // 在locatedir生成的Hfile文件
    rdds.saveAsNewAPIHadoopFile(locatedir,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      conf)
    //HFile導(dǎo)入到HBase
    val load = new LoadIncrementalHFiles(conf)

    //HBase的表名
    val tableName = "userprofile_labels"
    //創(chuàng)建HBase的鏈接,利用默認(rèn)的配置文件,讀取HBase的master地址
    val conn = ConnectionFactory.createConnection(conf)
    //根據(jù)表名獲取表
    val table = conn.getTable(TableName.valueOf(tableName))

    try {
      //獲取HBase表的region分布
      val regionLocation = conn.getregionLocation(TableName.valueOf(tableName))
      //創(chuàng)建一個(gè)hadoop的mapreduce的job
      val job = Job.getInstance(conf)
      //設(shè)置job名稱,任意命名
      job.setJobName("Hive2HBase")
      //輸出文件的內(nèi)容KeyValue
      job.setMapOutputKeyClass(classOf[KeyValue])
      //設(shè)置文件輸出key,outkey要用ImmutableBytesWritable
      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
      //配置HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocation)
      //開始導(dǎo)入
      load.doBulkLoad(new Path(locatedir), conn.getAdmin, table, regionLocation)
    } finally {
      table.close()
      conn.close()
    )
    spark.close()
  }
}

上述內(nèi)容來自書籍:《用戶畫像方法論與工程化解決方案》

三、補(bǔ)充:Region分裂(Split)

參考:hbase的split策略和預(yù)分區(qū) 和 【原創(chuàng)】HBase的分裂(Split)與緊縮(Compaction)

??Region分裂是HBase最核心的功能之一,是實(shí)現(xiàn)分布式可擴(kuò)展性的基礎(chǔ)。最初,每個(gè)Table只有一個(gè)Region,隨著數(shù)據(jù)的不斷寫入,HBase根據(jù)一定的觸發(fā)條件和一定的分裂策略將Hbase的一個(gè)region分裂成兩個(gè)子region并對父region進(jìn)行清除,通過HBase的balance機(jī)制,實(shí)現(xiàn)分裂后的region負(fù)載均衡到對應(yīng)RegionServer上。若一個(gè)table沒有進(jìn)行預(yù)分區(qū),那么只有一個(gè)region,初始化表時(shí)數(shù)據(jù)的讀寫都命中同一個(gè)regionServer,會(huì)造成熱點(diǎn)問題,且region進(jìn)行split時(shí)集群是不可用的,頻繁的split也會(huì)造成大量的集群I/O,性能很低。目前常見的HBase分裂方式有三種:

  • Per-Spliting(預(yù)分區(qū))
  • Auto-Spliting(自動(dòng)分裂)
  • Force-Spliting(強(qiáng)制分裂)

??Per-Spliting指的是在HBase創(chuàng)建Table時(shí),指定好Table的Region的個(gè)數(shù),生成多個(gè)Region。這么做的好處是一方面可以避免熱點(diǎn)數(shù)據(jù)的寫入問題(只有一個(gè)region,寫數(shù)據(jù)量大時(shí)其余RegionServer就是空閑的),另一方面減少Region的Split幾率,同時(shí)減少消耗集群的系統(tǒng)資源(IO,網(wǎng)絡(luò)),減少因Split暫停Region的上線造成對HBase讀寫數(shù)據(jù)的影響。

??HBase默認(rèn)建表時(shí)只有一個(gè)Region,此時(shí)的RowKey是沒有邊界的,即沒有StartKey和EndKey。進(jìn)行預(yù)分區(qū)時(shí),需要配置切分點(diǎn),使得HBase知道在哪個(gè)RowKey點(diǎn)做切分。hbase提供了兩種pre-split算法:HexStringSplit和UniformSplit,前者適用于十六進(jìn)制字符的rowkey,后者適用于隨機(jī)字節(jié)數(shù)組的rowkey。

//創(chuàng)建一個(gè)名為hex_test的表,有兩個(gè)列簇info和desc,可存3個(gè)版本的數(shù)據(jù),副本為2,預(yù)先指定10個(gè)region,且split算法為HexStringSplit
create 'hex_test',{NAME=>'info',VERSIONS=>3},{NAME=>'desc',VERSIONS=>3},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit',REGION_REPLICATION=>2}

//UniformSplit
create 'uniform_test',{NAME=>'info',VERSIONS=>3},{NAME=>'desc',VERSIONS=>3},{NUMREGIONS=>10,SPLITALGO=>'UniformSplit',REGION_REPLICATION=>2}

??自動(dòng)分裂指的是隨著不斷向表中寫入數(shù)據(jù),Region也會(huì)不斷增大,HBase根據(jù)觸發(fā)的分裂策略自動(dòng)分裂Region,當(dāng)前HBase已經(jīng)有6中分裂觸發(fā)的策略,不同版本中配置的分裂策略不同。文章來源地址http://www.zghlxwxcb.cn/news/detail-496727.html

// 強(qiáng)制分裂
split 'forced_table', 'b' //其中forced_table 為要split的table , ‘b’ 為split 點(diǎn)

// 我們還可以通過配置hbase.regionserver.region.split.policy指定自己的split策略

<!--定時(shí)切分執(zhí)行類-->
<property>
    <name>hbase.regionserver.region.split.policy</name>
    <value>org.apache.hadoop.hbase.regionserver.TimingRegionSplitPolicy</value>
</property>
<!--定時(shí)切分時(shí)間-->
<property>
     <name>hbase.regionserver.region.split.startTime</name>
     <value>02:00:00</value>
</property>
<property>
    <name>hbase.regionserver.region.split.endTime</name>
    <value>04:00:00</value>
</property>

到了這里,關(guān)于Hbase的bulkload流程與實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Hadoop/HDFS/MapReduce/Spark/HBase重要知識(shí)點(diǎn)整理

    Hadoop/HDFS/MapReduce/Spark/HBase重要知識(shí)點(diǎn)整理

    本復(fù)習(xí)提綱主要參考北京大學(xué)計(jì)算機(jī)學(xué)院研究生課程《網(wǎng)絡(luò)大數(shù)據(jù)管理與應(yīng)用》課程資料以及廈門大學(xué)計(jì)算機(jī)科學(xué)系研究生課程 《大數(shù)據(jù)技術(shù)基礎(chǔ)》相關(guān)材料整理而成,供廣大網(wǎng)友學(xué)習(xí)參考,如有版權(quán)問題請聯(lián)系作者刪除:guanmeige001@pku.edu.cn Hadoop簡介 Hadoop的功能和作用: 高

    2024年02月02日
    瀏覽(48)
  • 《Git入門實(shí)踐教程》前言+目錄

    版本控制系統(tǒng)(VCS)在項(xiàng)目開發(fā)中異常重要,但和在校大學(xué)生的交流中知道,這個(gè)重要方向并未受到重視。具備這一技能,既是項(xiàng)目開發(fā)能力的體現(xiàn),也可為各種面試加碼。在學(xué)習(xí)體驗(yàn)后知道,Git多樣化平臺(tái)、多種操作方式、豐富的資源為業(yè)內(nèi)人士提供了方便的同時(shí),也造成

    2024年02月10日
    瀏覽(96)
  • FPGA學(xué)習(xí)實(shí)踐之旅——前言及目錄

    很早就有在博客中記錄技術(shù)細(xì)節(jié),分享一些自己體會(huì)的想法,拖著拖著也就到了現(xiàn)在。畢業(yè)至今已經(jīng)半年有余,隨著項(xiàng)目越來越深入,感覺可以慢慢進(jìn)行總結(jié)工作了。趁著2024伊始,就先開個(gè)頭吧,這篇博客暫時(shí)作為匯總篇,記錄在這幾個(gè)月以及之后從FPGA初學(xué)者到也算有一定

    2024年02月03日
    瀏覽(100)
  • 大數(shù)據(jù)處理技術(shù)作業(yè)——使用HBase&MongoDB&MapReduce進(jìn)行數(shù)據(jù)存儲(chǔ)和管理

    大數(shù)據(jù)處理技術(shù)作業(yè)——使用HBase&MongoDB&MapReduce進(jìn)行數(shù)據(jù)存儲(chǔ)和管理

    寫這篇文章的目的,主要是為了記錄一下這次作業(yè)歷程,并且筆者了解到很多同志飽受作業(yè)折磨,遂簡單分享一下個(gè)人完成作業(yè)的歷程,以下內(nèi)容僅為本人的一些亂七八糟的想法, 僅作參考O(∩_∩)O 1、本作業(yè)的鏈接 【完成本次作業(yè)用到的代碼文件,列出網(wǎng)盤鏈接,https://p

    2024年02月07日
    瀏覽(23)
  • 基于Hadoop的MapReduce網(wǎng)站日志大數(shù)據(jù)分析(含預(yù)處理MapReduce程序、hdfs、flume、sqoop、hive、mysql、hbase組件、echarts)

    基于Hadoop的MapReduce網(wǎng)站日志大數(shù)據(jù)分析(含預(yù)處理MapReduce程序、hdfs、flume、sqoop、hive、mysql、hbase組件、echarts)

    需要本項(xiàng)目的可以私信博主?。?! 本項(xiàng)目包含:PPT,可視化代碼,項(xiàng)目源碼,配套Hadoop環(huán)境(解壓可視化),shell腳本,MapReduce代碼,文檔以及相關(guān)說明教程,大數(shù)據(jù)集! 本文介紹了一種基于Hadoop的網(wǎng)站日志大數(shù)據(jù)分析方法。本項(xiàng)目首先將網(wǎng)站日志上傳到HDFS分布式文件系統(tǒng)

    2024年02月16日
    瀏覽(110)
  • 大數(shù)據(jù)期資料2023 Beta版 - Hadoop、HDFS、MapReduce、Hive、ZooKeeper、Kafka、HBase詳解

    大數(shù)據(jù)期資料2023 Beta版 - Hadoop、HDFS、MapReduce、Hive、ZooKeeper、Kafka、HBase詳解

    了解大數(shù)據(jù)概念、Hadoop、HDFS、MapReduce、Hive、ZooKeeper、Kafka、HBase等技術(shù),包括特點(diǎn)、命令操作和啟動(dòng)關(guān)閉方法。獲取2023年大數(shù)據(jù)資料Beta版。

    2024年02月06日
    瀏覽(177)
  • MapReduce概述及工作流程

    MapReduce概述及工作流程

    mapreduce原語(獨(dú)創(chuàng)) mapreduce工作流程(重點(diǎn)) MR作業(yè)提交流程(重點(diǎn)) YARN RM-HA搭建(熟練) 運(yùn)行自帶的wordcount(了解) 動(dòng)手寫wordcount(熟練) MapReduce原語 hadoop MapReduce框架可以讓你的應(yīng)用在集群中 可靠地 容錯(cuò)地 并行 處理TB級(jí)別的數(shù)據(jù) 1024TB=1PB? 1024PB=1EB? 1024EB=1ZB MapReduc

    2023年04月08日
    瀏覽(23)
  • MapReduce 原理與實(shí)踐

    MapReduce 原理與實(shí)踐

    Hadoop MapReduce 是一個(gè) 編程框架 ,它可以輕松地編寫應(yīng)用程序,以可靠的、容錯(cuò)的方式處理大量的數(shù)據(jù)(數(shù)千個(gè)節(jié)點(diǎn))。 正如其名,MapReduce 的工作模式主要分為 Map 階段和 Reduce 階段 。 一個(gè) MapReduce 任務(wù)(Job)通常將輸入的數(shù)據(jù)集分割成獨(dú)立的塊,這些塊被 map 任務(wù)以完全并行的

    2024年02月06日
    瀏覽(48)
  • MapReduce初級(jí)編程實(shí)踐

    MapReduce初級(jí)編程實(shí)踐

    ubuntu18.04虛擬機(jī)和一個(gè)win10物理主機(jī) 編程環(huán)境 IDEA 虛擬機(jī)ip:192.168.1.108 JDK:1.8 使用Java編程一個(gè)WordCount程序,并將該程序打包成Jar包在虛擬機(jī)內(nèi)執(zhí)行 首先使用IDEA創(chuàng)建一個(gè)Maven項(xiàng)目 在pom.xml文件內(nèi)引入依賴和打包為Jar包的插件: 編寫對應(yīng)的程序: MyProgramDriver類用于執(zhí)行程序入口

    2023年04月26日
    瀏覽(21)
  • 實(shí)驗(yàn)5:MapReduce 初級(jí)編程實(shí)踐

    由于CSDN上傳md文件總是會(huì)使圖片失效 完整的實(shí)驗(yàn)文檔地址如下: https://download.csdn.net/download/qq_36428822/85709497 實(shí)驗(yàn)內(nèi)容與完成情況: (一)編程實(shí)現(xiàn)文件合并和去重操作 對于兩個(gè)輸入文件,即文件 A 和文件 B,請編寫 MapReduce 程序,對兩個(gè)文件進(jìn)行合并, 并剔除其中重復(fù)的內(nèi)

    2024年02月07日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包