十、MapReduce的特殊應(yīng)用場景
1、使用MapReduce進行join操作
MapReduce可以對海量數(shù)據(jù)進行計算,但是有些情況下,計算的結(jié)果可能來自于多個文件,每個文件的數(shù)據(jù)格式是不一致,但是多個文件存在某種關(guān)聯(lián)關(guān)系,類似于MySQL中外鍵關(guān)系,如果想計算這樣的結(jié)果,MR程序也是支持的。這種計算我們稱之為join計算。
MR的join根據(jù)join數(shù)據(jù)的位置分為兩種情況:1、Map端的Join操作,2、Reduce端的join操作。
第一種Join使用:Reduce端的Join操作
思維就是在map端將多個不同格式的文件全部讀取到,然后根據(jù)不同文件的格式對數(shù)據(jù)進行切割,切割完成以后,將數(shù)據(jù)進行封裝,然后以多個文件的共同字段當作key,剩余字段當作value發(fā)送給reduce。
reduce端根據(jù)共同的key值,把value數(shù)據(jù)進行聚合,聚合完成以后,進行多文件的join操作。
Reduce端的join存在的問題:非常容易出現(xiàn)數(shù)據(jù)傾斜問題:
如果多個進行join的文件數(shù)據(jù)量相差過大,就非常容易出現(xiàn)數(shù)據(jù)傾斜問題 —— 大文件join小文件容易出現(xiàn)這個問題
假如order.txt文件300M,product.txt 10M
如果采用的默認切片機制,那么這兩個文件切成4片
order.txt 128M 128M 44M
product.txt 10m
Reduce階段也能會出現(xiàn)數(shù)據(jù)傾斜問題,不同key值對應(yīng)的數(shù)據(jù)量相差過大
案例分析:
/**
* 現(xiàn)在有兩個文件,第一個文件代表商品銷售數(shù)據(jù),另外一個文件代表商品的詳細信息
* 兩個文件的內(nèi)容分別如下:
* 1、order.txt 訂單文件---每一行數(shù)據(jù)的多個字段以\t分割
* order_id-訂單編號 pid--商品id account--商品的數(shù)量
* o001 p001 10
* o001 p002 5
* o002 p003 11
* o002 p002 1
* 2、product.txt 商品文件---每一行數(shù)據(jù)的多個字段是以空格進行分割的
* pid--商品id pname-商品的名字
* p001 小米
* p002 自行車
* p003 電視機
*
* 使用MR程序?qū)崿F(xiàn)如下的效果展示 最終的結(jié)果每一行以\t分割的
* order_id pid pname account
* o001 p001 小米 10
* o001 p002 自行車 5
*
* 核心邏輯:借助MapReduce實現(xiàn)一種類似于MySQL的多表連接查詢功能。
* MR實現(xiàn)有兩種方式:map端的join reduce端join
*/
package com.kang.join.reducce;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
/**
* MR的第一種join方式:reduce端的join
* 思維:
* 1、通過map階段讀取兩個文件的數(shù)據(jù)
* 2、map階段先獲取當前行kv到切片數(shù)據(jù)對應(yīng)的文件,然后根據(jù)文件進行不同方式的切割。
* 3、然后對切割的數(shù)據(jù)進行封裝(將數(shù)據(jù)傳輸?shù)絩educe進行聚合的),如果要在reduce端做join操作
* 需要在map端輸出數(shù)據(jù)時,以兩個文件的關(guān)聯(lián)字段當作key值進行傳輸,以兩個文件的剩余字段當作value傳輸
*
* 自定義JavaBean,JavaBean包含兩個文件的所有字段,同時還需要包含一個標識字段(數(shù)據(jù)來自于哪個文件的),
* 然后使用JavaBean封裝兩個文件的不同數(shù)據(jù)。
*/
public class FirstDriver {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.31.104:9000");
Job job = Job.getInstance(configuration);
job.setJarByClass(FirstDriver.class);
FileInputFormat.setInputPaths(job,new Path("/join"));
job.setMapperClass(FirstMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderProductBean.class);
job.setReducerClass(FirstReducer.class);
job.setOutputKeyClass(OrderProductBean.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(1);
Path path = new Path("/joinOutput");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.31.104:9000"), configuration, "root");
if (fs.exists(path)){
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
boolean flag = job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
class FirstMapper extends Mapper<LongWritable, Text,Text,OrderProductBean>{
/**
* map方法讀取的每一行的kv數(shù)據(jù),kv數(shù)據(jù)可能是訂單文件的數(shù)據(jù),也可能是商品文件的數(shù)據(jù)
* @param key
* @param value
* @param context 上下文對象 context也可以獲取每一個kv對應(yīng)的切片中文件名
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderProductBean>.Context context) throws IOException, InterruptedException {
//代表獲取當前kv數(shù)據(jù)的切片
FileSplit fileSplit = (FileSplit) context.getInputSplit();
//獲取kv數(shù)據(jù) 在切片中屬于哪個文件的
Path path = fileSplit.getPath();
//拿到文件的名字
String name = path.getName();
String line = value.toString();
//if如果屬于訂單文件數(shù)據(jù),如何切割 如何封裝
if (name.equals("order.txt")){
String[] array = line.split("\t");
String orderId = array[0];
String pId = array[1];
int account = Integer.parseInt(array[2]);
OrderProductBean orderProductBean = new OrderProductBean(orderId,pId,account,"order");
context.write(new Text(pId),orderProductBean);
}else {
//else代表是如果是商品文件,如何切割 如何封裝
String[] array = line.split(" ");
String pId = array[0];
String pName = array[1];
OrderProductBean orderProductBean = new OrderProductBean(pId,pName,"product");
context.write(new Text(pId),orderProductBean);
}
}
}
/**
* reduce端就是根據(jù)pid把訂單表和商品表對應(yīng)的信息聚合起來,聚合起來的結(jié)果肯定某一件商品的訂單信息和商品信息
* key values
* p001 o001,p001,10,order p001,小米,product
* p002 o001,poo2,5,order o002,p002,1,order p002,自行車,product
*/
class FirstReducer extends Reducer<Text,OrderProductBean, OrderProductBean, NullWritable>{
@Override
protected void reduce(Text key, Iterable<OrderProductBean> values, Reducer<Text, OrderProductBean, OrderProductBean, NullWritable>.Context context) throws IOException, InterruptedException {
//放當前商品id對應(yīng)的所有的訂單信息
List<OrderProductBean> orders = new ArrayList<>();
//當前商品的商品信息
OrderProductBean productBean = new OrderProductBean();//商品信息
/**
* MapReduce當中,values集合中的bean都是同一個bean
* 如果要把values的bean加到一個集合中,我們需要創(chuàng)建一個全新的bean,把values中bean的數(shù)據(jù)
* 復(fù)制到全新的bean當中 然后全新的bean加到集合中 這樣的話不會出現(xiàn)數(shù)據(jù)錯亂
*/
for (OrderProductBean bean : values) {
if (bean.getFlag().equals("order")){
OrderProductBean orderBean = new OrderProductBean();
try {
//BeanUtils是apache提供的一個工具類,工具類實現(xiàn)把一個Java對象的屬性復(fù)制到另外一個Java對象當中
BeanUtils.copyProperties(orderBean,bean);//bean復(fù)制給orderBean
orders.add(orderBean);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
}else {
try {
BeanUtils.copyProperties(productBean,bean);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
}
}
for (OrderProductBean order : orders) {
order.setpName(productBean.getpName());
context.write(order,NullWritable.get());
}
}
}
package com.kang.join.reducce;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* JavaBean是用來封裝兩個不同文件的數(shù)據(jù)的
* JavaBean包含兩個文件的所有字段
*/
public class OrderProductBean implements Writable {
private String orderId = "";
private String pId = "";
private String pName = "";
private Integer account = 0;
private String flag = "";//代表的是一個標識,標識是用來標識JavaBean封裝的是訂單數(shù)據(jù)還是商品數(shù)據(jù)
public OrderProductBean() {
}
/**
* 專門是用來封裝訂單數(shù)據(jù)文件信息的
* @param orderId
* @param pId
* @param account
* @param flag
*/
public OrderProductBean(String orderId, String pId, Integer account, String flag) {
this.orderId = orderId;
this.pId = pId;
this.account = account;
this.flag = flag;
}
/**
* 專門是用來封裝商品信息數(shù)據(jù)的
* @param pId
* @param pName
* @param flag
*/
public OrderProductBean(String pId, String pName, String flag) {
this.pId = pId;
this.pName = pName;
this.flag = flag;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getpId() {
return pId;
}
public void setpId(String pId) {
this.pId = pId;
}
public String getpName() {
return pName;
}
public void setpName(String pName) {
this.pName = pName;
}
public Integer getAccount() {
return account;
}
public void setAccount(Integer account) {
this.account = account;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return orderId + "\t" + pId + "\t" + pName + "\t" + account;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(pId);
out.writeUTF(pName);
out.writeInt(account);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
orderId = in.readUTF();
pId = in.readUTF();
pName = in.readUTF();
account = in.readInt();
flag = in.readUTF();
}
}
第二種join使用:map端的join操作
map端的join適用于如果兩個需要做join操作文件數(shù)據(jù)量相差過大的情況下,map端的join操作可以盡最大可能避免map端的數(shù)據(jù)傾斜問題的出現(xiàn),如果使用map端的join的話,我們就不需要reduce階段。
map的join操作的核心邏輯是:將小文件緩存起來,大文件正常使用MR程序做切片做讀取。
在驅(qū)動程序中通過job.addCacheFile(new URI("XXXXX"))方法緩存小文件,小文件可以緩存無數(shù)個(小于100M)
在mapper階段的setup方法中通過context.getCacheFiles方法獲取到緩存的文件,然后通過IO流讀取小文件數(shù)據(jù),在MapTask中使用Map集合把小文件緩存起來,緩存的時候以小文件和大文件的關(guān)聯(lián)字段當作map集合的key值。
案例分析:
package com.kang.join.map;
import com.kang.join.reducce.FirstDriver;
import com.kang.join.reducce.OrderProductBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
/**
* Map端的join操作:
* 核心邏輯:在MR執(zhí)行的時候,將小文件在內(nèi)存中緩存起來,然后map階段從緩存當中把緩存的小文件讀取到,將小文件數(shù)據(jù)
* 在內(nèi)存保存起來,然后大文件正常使用MR程序進行切片讀取,map方法每讀取到一個大文件中一行數(shù)據(jù),將這一行數(shù)據(jù)
* 的關(guān)聯(lián)字段獲取到,然后根據(jù)關(guān)聯(lián)字段從map緩存的小文件數(shù)據(jù)中獲取對應(yīng)的數(shù)據(jù)添加上。
*/
public class SecondDriver {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.31.104:9000");
Job job = Job.getInstance(configuration);
job.setJarByClass(FirstDriver.class);
/**
* 輸入文件只輸入大文件order.txt 小文件不這樣輸入,因為小文件這樣輸入會產(chǎn)生小切片,小切片會導(dǎo)致數(shù)據(jù)傾斜問題
*/
FileInputFormat.setInputPaths(job,new Path("/join/order.txt"));
job.addCacheFile(new URI("hdfs://192.168.31.104:9000/join/product.txt"));
job.setMapperClass(SecondMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrderProductBean.class);
job.setNumReduceTasks(0);
Path path = new Path("/mapOutput");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.31.104:9000"), configuration, "root");
if (fs.exists(path)){
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
boolean flag = job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
/**
* 做map端的join 最核心的邏輯就是 在map方法讀取大文件數(shù)據(jù)之前,先從緩存中把小文件獲取到,然后把小文件中數(shù)據(jù)先保存起來
* 保存的時候以key-value的形式保存 key是大小文件的關(guān)聯(lián)字段,value是剩余的數(shù)據(jù)
*
* Mapper中除了map方法以外 還有一個方法setup方法 setup方法會在map方法執(zhí)行之前執(zhí)行,而且只會執(zhí)行一次
*/
class SecondMapper extends Mapper<LongWritable,Text,NullWritable,OrderProductBean>{
private Map<String,String> product = new HashMap<>();//緩存的產(chǎn)品信息的屬性
/**
* setup方法每一個mapTask只執(zhí)行一次,在map方法之前執(zhí)行的
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Mapper<LongWritable, Text, NullWritable, OrderProductBean>.Context context) throws IOException, InterruptedException {
URI[] cacheFiles = context.getCacheFiles();
URI uri = cacheFiles[0];
String path = uri.getPath();
BufferedReader br = null;
try {
FileSystem fs = FileSystem.get(new URI(context.getConfiguration().get("fs.defaultFS")), context.getConfiguration(), "root");
FSDataInputStream inputStream = fs.open(new Path(path));
br = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = br.readLine()) != null){
String[] array = line.split(" ");
String pId = array[0];
String pName = array[1];
product.put(pId,pName);
}
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}finally {
br.close();
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, OrderProductBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] array = line.split("\t");
String orderId = array[0];
String pId = array[1];
int account = Integer.parseInt(array[2]);
String pName = product.get(pId);
OrderProductBean orderProductBean = new OrderProductBean(orderId,pId,pName,account);
context.write(NullWritable.get(),orderProductBean);
}
}
2、使用MapReduce的計數(shù)器
計數(shù)器是MR程序運行過程中提供的一種的特殊的計數(shù)機制,計數(shù)器可以幫助我們查看MR程序運行過程中的數(shù)據(jù)量的變化趨勢或者是我們感興趣的一些數(shù)據(jù)量的變化。
計數(shù)器在MR程序中自帶了很多計數(shù)器,計數(shù)器只能累加整數(shù)類型的值,最后把計數(shù)器輸出到我們的日志當中。
計數(shù)器是由三部分組成的:
- 計數(shù)器組:一個計數(shù)器組當中可以包含多個計數(shù)器
- 計數(shù)器:真正用來記錄記錄數(shù)的東西,計數(shù)器一般都是一個字符串的名字
- 計數(shù)器的值:計數(shù)器的值都是整數(shù)類型
計數(shù)器在map階段和reduce階段都有的,如果在map階段寫的計數(shù)器,是在map任務(wù)結(jié)束之后會輸出,如果在reduce階段使用的計數(shù)器,reduce階段執(zhí)行完成輸出。
計數(shù)器的使用有兩種方式:
-
1、直接使用字符串的形式進行操作
context.getCounter(String groupName,String counterName).increment(long num) -
2、使用Java的枚舉類的形式操作計數(shù)器 —— 先定義一個枚舉類
enum MyCounters{ UPPERCOUNT,LOWERCOUNT; } 然后在reduce中加入 context.getCounter(MyCounters.LOWERCOUNT).increment(1);
context.getCounter(enumObject).increment(long num)
計數(shù)器組的名字就是枚舉類的類名
計數(shù)器的名字就是枚舉類的對象名
計數(shù)器使用的時候,每一個MapTask或者ReduceTask單獨輸出它這個任務(wù)計數(shù)器的結(jié)果,等MR程序全部運行完成,計數(shù)器會把所有MapTask或者ReduceTask中相同的計數(shù)器結(jié)果累加起來,得到整個MR程序中計數(shù)器的結(jié)果。
合理利用計數(shù)器和查看計數(shù)器可以檢測MR程序運行有沒有數(shù)據(jù)傾斜問題的出現(xiàn)。
3、MapReduce做數(shù)據(jù)清洗
有時候需要把一些數(shù)據(jù)中不合法,非法的數(shù)據(jù)通過MapReduce程序清洗過濾掉,因此數(shù)據(jù)只需要清洗掉即可,不需要做任何的聚合操作,所以一般涉及到數(shù)據(jù)清洗操作只需要mapper階段即可,reduce階段我們不需要。
如果需要過濾數(shù)據(jù),只需要在mapepr階段將讀取到的數(shù)據(jù)按照指定的規(guī)則進行篩選,篩選符合條件的數(shù)據(jù)通過context.write寫出,不符合要求的數(shù)據(jù),只要不調(diào)用context,write方法自然而言就過濾掉了
案例分析:
package com.kang.filter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* 單詞文件中中包含大寫字母H的單詞全部過濾調(diào)用,只保留不包含大寫字母H的單詞
* 輸出的時候一個單詞輸出一行
*/
public class FilterDriver {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.31.104:9000");
Job job = Job.getInstance(configuration);
job.setJarByClass(FilterDriver.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job,"/wordcount.txt");
job.setMapperClass(FilterMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
Path path = new Path("/wcFilterOutput");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.31.104:9000"), configuration, "root");
if (fs.exists(path)){
fs.delete(path);
}
FileOutputFormat.setOutputPath(job,path);
boolean flag = job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
class FilterMapper extends Mapper<LongWritable, Text, NullWritable,Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
if (word.contains("H")){
continue;
}else {
context.write(NullWritable.get(),new Text(word));
}
}
}
}
十一、MapReduce的工作流程:詳細的工作流程
第一步:提交MR作業(yè)資源
1.1、InputFormat生成切片規(guī)劃文件job.split文件
1.2、將整個MR程序的相關(guān)配置項全部封裝到一個job.xml配置文件
1.3、借助jobSummitter提交切片規(guī)劃文件以及配置文件到指定的目錄
第二步:運行MapTask任務(wù)
2.1、通過InputFormat的createRecordReader讀取對應(yīng)切片的kv數(shù)據(jù)。
2.2、通過mapTask的map方法進行kv數(shù)據(jù)的處理。
2.3、調(diào)用context.write方法將map處理完成的kv數(shù)據(jù)寫出,先計算kv數(shù)據(jù)的分區(qū)編號。
2.4、調(diào)用collector收集器將kv數(shù)據(jù)以及分區(qū)寫出到環(huán)形緩沖區(qū)。
2.5、環(huán)形緩沖區(qū)到達一定的閾值之后,先對環(huán)形緩沖區(qū)數(shù)據(jù)進行排序,排好序之后將數(shù)據(jù)一次性溢寫到文件中,清空溢寫的數(shù)據(jù)緩沖區(qū),溢寫可能會發(fā)生多次,也就可能會產(chǎn)生多個溢寫文件,當map任務(wù)運行完成,多個溢寫文件會合并成一個大的溢寫文件spill.out,同時合并大文件需要進行排序。
2.6、溢寫的過程中如果設(shè)置了Combiner,那么溢寫的過程中會進行Combiner操作,Combiner到底什么時機執(zhí)行,不一定,Combiner作用是為了減少了map溢寫的數(shù)據(jù)量以及map向reduce傳輸?shù)臄?shù)據(jù)量。
第三步:運行ReduceTask任務(wù)
3.1、copy階段:先從不同的MapTask上拷貝指定分區(qū)的數(shù)據(jù)到達ReduceTask的節(jié)點內(nèi)存,內(nèi)存放不下,溢寫磁盤文件中。
3.2、merge階段:拷貝數(shù)據(jù)到ReduceTask中,溢寫數(shù)據(jù)的時候會進行合并操作,減少溢寫文件的產(chǎn)生。
3.3、Sort階段:按照指定的分組規(guī)則對數(shù)據(jù)進行聚合,同時對merge合并完成的數(shù)據(jù)進行一次排序。
【注】2.3 —— 3.3 為mapreduce中的shuffle機制
3.4、執(zhí)行Reduce方法,一組相同key調(diào)用一次reduce方法。
第四步:輸出計算結(jié)果
reduce計算完成,調(diào)用context.write方法寫出key value數(shù)據(jù),MR底層會調(diào)用OutputFormat的實現(xiàn)類實現(xiàn)數(shù)據(jù)到文件的寫出
十二、MR程序運行的問題總結(jié)
1、如何在控制臺輸出日志文件
MR程序運行需要在控制臺輸出日志,MR程序控制臺輸出的日志能清晰看到MR程序切片數(shù)量以及MapTask的數(shù)量和ReduceTask的數(shù)量
但是默認情況下控制臺是無法輸出日志的,如果要輸出日志信息,我們需要對代碼進行修改
1、需要在項目的resources目錄引入log4j.properties文件
日志信息輸出文件,文件當中定義了我們?nèi)绾屋敵鋈罩拘畔?
2、引入一個日志框架的依賴,如果沒有這個依賴,那么日志文件不會生效輸出 pom.xml
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
2、運行MR程序報錯HDFS的權(quán)限問題
問題的原因:
MR程序運行過程中需要在HDFS創(chuàng)建目錄,并且向目錄中寫入MR程序運行結(jié)果,但是如果我們是在windows本地運行代碼,MR程序在運行中,會使用windows上的用戶名當作HDFS用戶進行寫操作權(quán)限,但是默認情況下,HDFS上除了root用戶以外,其他用戶基本上都是無權(quán)限寫入的
報錯解決方案:
1、簡單粗暴,但是不安全:給HDFS的根目錄賦予一個777最高權(quán)限,不安全 ---- 禁止大家操作
2、MR程序在運行的時候,指定HDFS的用戶為root用戶而非windows本地的用戶(建議大家使用) —— 見下面詳細的圖文操作
在MR程序的 vm options中增加一個配置項:-DHADOOP_USER_NAME=root
3、在HDFS集群中配置忽略權(quán)限檢查,這個效果等同于第一種設(shè)置的方式hdfs-site.xml 必須在hdfs集群中配置,而非MR代碼中
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
3、當MR程序打成JAR包以后,在Hadoop集群的YARN上運行的時候,報錯ClassNotFoundException: xxxxx.xxMapper
報錯原因:
不是因為類的class文件沒有打包到j(luò)ar包當中,而是因為hadoop運行jar包的時候,不知道如何在JAR包中尋找這個類
解決方案:
只需要讓Hadoop運行jar包能找到類即可,在Driver驅(qū)動程序當中配置一行代碼即可
job.setJarByClass(xxxDriver.class);
4、當MR程序打成JAR包以后,在Hadoop集群的YARN上運行的時候,報錯資源不足的問題
報錯原因:
1、虛擬機的資源太少,MR程序運行的時候,每一個map任務(wù)默認需要1024MB的內(nèi)存
mapred-site.xml
<property>
<name>mapreduce.map.memory.mb</name>
<value>250</value>
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx250M</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>300</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx300M</value>
</property>
2、資源不足之后,YARN會把一些已經(jīng)分配了資源的MapTask強制殺死,之所以會殺死,是因為YARN會進行資源的檢查,如果不想報這個錯,還有一種方案,關(guān)閉YARN的資源檢測
yarn-site.xml(不建議添加此配置項)
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
【注意】:MR程序的jar包的運行命令如下:
hadoop jar jar包的路徑 jar包中的Driver驅(qū)動程序的全限定類名 參數(shù)1 參數(shù)2 …
十三、MR項目創(chuàng)建使用的細節(jié)問題
1、創(chuàng)建時需要導(dǎo)入的依賴以及相關(guān)配置性問題
導(dǎo)入的依賴
hadoop-client
hadoop-hdfs
slf4j-log4j12:查看MR程序的運行日志
還需要在resources目錄下引入一個log4j.properties文件,文件查看日志
同時還可以在resources目錄引入Hadoop的相關(guān)配置文件:core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml
如果引入這些配置文件,那么MR程序在運行的時候,配置文件生效的范圍:
Configuration配置文件對象----->resources目錄下引入配置文件----->大數(shù)據(jù)環(huán)境下配置的配置文件(MR程序必須運行在大數(shù)據(jù)集群中,而非windows上,如果是在windows上運行,那么使用的默認配置)
2、MR項目的打包在Hadoop集群運行
#概念
在windows上只是測試運行的,使用的環(huán)境不是大數(shù)據(jù)環(huán)境,因此無法做到分布式運行,如果真的想讓MR程序分布式運行,我們需要將本地編寫好的MR程序打成一個jar包,上傳到Hadoop集群的某個節(jié)點,然后使用
hadoop jar xxx.jar xxx.xxxDriver 運行MR程序
windows的idea打jar包有兩種方式:
1、自己手動生成jar包
file--->project structure---->artifacts--->+--->jar
2、借助maven自動化構(gòu)建工具生成jar包
【注意】如果我們需要在Hadoop集群上運行,那么必須啟動YARN
#復(fù)習(xí)補充知識點
1、MR程序在運行的時候,job提交作業(yè)的時候會自動識別我們的運行環(huán)境,如果我們是在windows本地運行的話,MR程序識別的環(huán)境為LocalRunner這么一個環(huán)境,這個環(huán)境是windows的模擬分布式的環(huán)境,因此我們MR程序基本上都是在windows上測試沒有問題之后,打成jar包,提交給Hadoop集群的YARN進行運行。
2、如果將代碼打成JAR包,部署到大數(shù)據(jù)集群上運行,也不一定是分布式運行,這個得看我們的配置
本地安裝模式:有一個特點,如果是在本地安裝模式下運行,MR程序也不是分布式運行,采用的也是模擬的運行環(huán)境,而非YARN
偽分布式安裝模式、完全分布式安裝模式、HA高可用安裝模式:需要修改配置文件,其中在mapred-site.xml文件中專門配置了MR的運行環(huán)境在YARN上運行的
mapreduce.framework.name yarn模式
如果在三種安裝模式當中,如果沒有配置上述的選項,那么就算YARN啟動成功了,MR程序也不會在YARN上運行,還是使用local本地模擬環(huán)境
手動生成jar包
選擇運行主類
至此手動jar包生成完畢!
將此jar包傳輸?shù)紿adoop集群的環(huán)境下進行運行,并在虛擬機中通過命令運行jar包
借助maven自動化構(gòu)建工具生成jar包
原理:maven是一個自動化構(gòu)建工具,maven工具除了可以幫助我們自動引入第三方編程依賴以外,他還有一個最核心最重要的功能:幫助進行項目的自動化構(gòu)建管理。
maven的生命周期:maven用來管理項目的編譯、測試和打包的
-
如果只運行后面的后面的周期,前面的生命周期也會自動觸發(fā)
-
如果前面的生命周期運行失敗,那么后面的運行周期就無法執(zhí)行
maven每一個生命周期之所以幫助我們做對應(yīng)的操作,是因為maven底層有一些插件,點擊對應(yīng)的生命周期時,調(diào)用底層的默認插件幫助我們完成操作,如果插件打包出現(xiàn)的效果不是我們需要的,那么我們就可以把maven生命周期對應(yīng)的插件給替換了即可。
十四、MapReduce的調(diào)優(yōu)相關(guān)知識點 —— 壓縮機制
MapReduce運行中,可能會產(chǎn)生很多影響MR計算效率的一些問題:數(shù)據(jù)傾斜問題、大量的磁盤IO、小文件過多…
針對磁盤IO問題,MR程序出現(xiàn)了一種壓縮和解壓縮機制,可以解決MR程序運行中涉及到大量磁盤IO的問題
-
壓縮和解壓縮是MR程序提供的一種,在Map輸出或者Reduce輸出,或者Map輸入之前,可以通過指定的壓縮算法對文件或者中間數(shù)據(jù)進行壓縮,這樣的話可以減少磁盤IO的數(shù)據(jù)量,如果我們在map的中間輸出指定了壓縮,那么reduce拉取會數(shù)據(jù)之后,會根據(jù)指定的壓縮機制對壓縮的數(shù)據(jù)進行解壓縮。
-
壓縮機制確實可以提升我們MR程序的運行效率,但是也是有成本的,壓縮因為使用專門的算法,算法越復(fù)雜,壓縮的時候程序的CPU的負載越大。
-
壓縮適用于IO密集的MR程序,計算密集的MR程序不適用
-
常用的壓縮算法的適用場景
- gzip
- 1、壓縮的文件無法被MapReduce切片。
- 2、壓縮效率和壓縮速度都相對而言比較快,如果一個文件壓縮之后在128兆左右的話可以適用這個壓縮機制。
- bzip2
- 1、壓縮的文件支持切片的。
- 2、壓縮效率很高,但是壓縮速度非常慢,如果我們MR程序?qū)r間要求不高,但是數(shù)據(jù)量非常龐大的情況下。
- snappy
- 1、壓縮文件不支持切片。
- 2、壓縮速度非???,是所有壓縮算法中最快的了,壓縮的效率比gzip低。
以上三種Hadoop其實都是支持的,只不過snappy只能大數(shù)據(jù)環(huán)境中使用,無法在windows本地使用。
- lzo
- 1、壓縮的文件支持切片,但是如果要支持切片是非常復(fù)雜的,MR程序支持適用lzo算法,但是MR程序沒有自帶這個算法。
- 2、壓縮效率不高,勝在速度非???。
- 使用比較麻煩的,因為Hadoop沒有自帶這個算法,使用的話得需要下載插件,引入依賴…
- lz4
- 速度比lzo快一點但是不支持切片。
- gzip
-
MapReduce程序可以壓縮數(shù)據(jù)的位置
-
Map的輸入
- 采用一些支持切片的壓縮機制:bzip2、lzo。
- gzip和snappy也可以用,只不過最好保證數(shù)據(jù)壓縮之后在128兆左右。
-
Map的輸出
- snappy機制
-
Reduce的輸出
- 最好也是支持切片的壓縮機制
-
-
在MapReduce中開啟壓縮機制
- 在MR中使用壓縮機制,不需要我們?nèi)ミM行手動的壓縮和解壓縮,只需要在MR的合適的位置指定我們使用的是何種壓縮機制,MR程序會自動的調(diào)用設(shè)置的壓縮和解壓縮算法進行自動化操作。
- Mapper的輸入開啟壓縮
- 只需要在Configuration或者core-site.xml文件增加如下一行配置即可:
配置名:io.compression.codecs
配置值:org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.Lz4Codec,org.apache.hadoop.io.compress.SnappyCodec - 只需要把上述配置配置好,MR程序在處理輸入文件時,如果輸入文件是上述配置的壓縮的后綴。
- 只需要在Configuration或者core-site.xml文件增加如下一行配置即可:
- Mapper的輸出可以開啟壓縮
- mapreduce.map.output.compress true/false
- mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.GzipCodec
- Reduce的輸出可以開啟壓縮
- FileOutputFormat.setCompressOutput(job,true);//是否開啟輸出壓縮
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//reduce輸出壓縮使用的壓縮機制.
- FileOutputFormat.setCompressOutput(job,true);//是否開啟輸出壓縮
-
可以使用如下命令檢查Hadoop集群目前本身不需要安裝插件就支持的壓縮算法
- hadoop checknative
十五、MapReduce的應(yīng)用場景
1、離線數(shù)據(jù)處理的場景下:數(shù)據(jù)對實時性要求不高(MR程序運行中涉及到大量的磁盤IO和網(wǎng)絡(luò)傳輸,因此會導(dǎo)致MR程序計算效率“不是很高”)。
2、適用于數(shù)據(jù)量比較龐大的文件,小文件操作不占優(yōu)勢,處理TB/PB級別規(guī)模的數(shù)據(jù)。
十六、MapReduce中的優(yōu)化問題
MapReduce雖然是大數(shù)據(jù)中一個分布式計算框架,確實可以計算海量的數(shù)據(jù),但是MR程序在運算過程中涉及到大量的磁盤IO和網(wǎng)絡(luò)傳輸,所以導(dǎo)致MR程序的運行效率相比于其他大數(shù)據(jù)計算框架效率不是很高。
因此開發(fā)MapReduce程序的時候,為了讓MR效率提高一點,可以對MR程序運行過程中的一些問題進行優(yōu)化,盡可能的提升MR的計算效率。
MpReduce導(dǎo)致計算運行緩慢的原因:
- 1、硬件受限制
- 內(nèi)存、CPU、硬盤的IO讀寫速度
- 掏錢解決
- 2、MR運行機制限制
- 數(shù)據(jù)傾斜問題
- MapTask、ReduceTask的任務(wù)數(shù)量設(shè)置
- MR運行過程中小文件過多
- MR運行過程中磁盤溢寫,磁盤IO次數(shù)過多
MapReduce的運行優(yōu)化解決問題:
- Mapper輸入階段優(yōu)化的措施
- 可能產(chǎn)生的問題:小文件過多、數(shù)據(jù)傾斜、某些大文件不可被切割
- 1、小文件過多的問題:CombinerTextInputFormat實現(xiàn)小文件的合并,減少小切片出現(xiàn)。
- 2、文件不可被切割,可以在MR程序處理之前,對文件數(shù)據(jù)重新進行壓縮,壓縮的時候選擇可以被切片的壓縮機制進行壓縮。
- 3、map階段的數(shù)據(jù)傾斜問題:合理的使用切片機制對輸入的數(shù)據(jù)進行切片。
- 4、合理的使用壓縮機制。
- Mapper階段優(yōu)化的措施
- 可能產(chǎn)生的問題:環(huán)形緩沖區(qū)溢寫的次數(shù)過多,溢寫文件的合并次數(shù)過多,溢寫和合并都涉及到磁盤IO。
- 1、溢寫次數(shù)過多,那么加大環(huán)形緩沖區(qū)的容量以及溢寫的閾值。mapred-site.xml/Configuration
mapreduce.task.io.sort.mb 環(huán)形緩沖區(qū)的容量
mapreduce.map.sort.spill.percent 溢寫的比例 小數(shù) - 2、溢寫的小文件并不是只合并一次,如果溢寫的小文件超過設(shè)置的指定數(shù)量,先進行一次合并。
mapreduce.task.io.sort.factor 默認值10 - 3、可以合理的利用的Mapper輸出壓縮,減少Mapper輸出的數(shù)據(jù)量。
- 4、在不干擾MR邏輯運行的前提下,合理的利用的Combiner組件對Map端的數(shù)據(jù)進行局部匯總,可以減少Mapper輸出的數(shù)據(jù)量。
- Reduce階段的優(yōu)化措施
- 產(chǎn)生的問題:reduce的任務(wù)數(shù)設(shè)置不合理,Reduce端的數(shù)據(jù)傾斜問題、Reduce階段拉取數(shù)據(jù)回來之后先寫到內(nèi)存中,內(nèi)存放不下溢寫磁盤(磁盤IO)。
- 1、任務(wù)書設(shè)置和數(shù)據(jù)傾斜問題:可以通過查看MR程序運行的計數(shù)器,自定義分區(qū)機制重新指定分區(qū)規(guī)則。
- 2、盡量不使用Reduce階段。
- 3、MR程序中,默認如果Map任務(wù)運行沒有結(jié)束,那么Reduce任務(wù)就無法運行??梢栽O(shè)置map任務(wù)和reduce任務(wù)共存(map任務(wù)沒有全部運行結(jié)束,reduce也可以開始運行)。
mapreduce.job.reduce.slowstart.completedmaps 0.05 - 4、合理的利用的Reduce端的輸出壓縮、也可以使用SequenceFile文件格式進行數(shù)據(jù)輸出。
MapReduce的重試問題的優(yōu)化:文章來源:http://www.zghlxwxcb.cn/news/detail-697169.html
MapReduce運行過程中,如果某一個Map任務(wù)或者reduce任務(wù)運行失敗,MR并不會直接終止程序的運行,而是會對失敗的map任務(wù)和reduce任務(wù)進行特定次數(shù)的重試,如果特定次數(shù)的重試之后Map和reduce都沒有運行成功,MR才會認為運行失敗。文章來源地址http://www.zghlxwxcb.cn/news/detail-697169.html
mapreduce.map.maxattempts 4
mapreduce.reduce.maxattempts 4
mapreduce.task.timeout 600000
到了這里,關(guān)于Hadoop的第二個核心組件:MapReduce框架第四節(jié)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!