MapReduce原理分析
什么是MapReduce
前言:如果想知道一堆牌中有多少張紅桃,直接的方式是一張張的檢查,并數(shù)出有多少張紅桃。
而MapReduce的方法是,給所有的節(jié)點分配這堆牌,讓每個節(jié)點計算自己手中有幾張是紅桃,然后將這個數(shù)匯總,得到結(jié)果。
概述
- 官方介紹:MapReduce是一種分布式計算模型,由Google提出,主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)的計算問題。
- MapReduce是分布式運行的,由倆個階段組成:Map和Reduce。
- MapReduce框架都有默認實現(xiàn),用戶只需要覆蓋map()和reduce()倆個函數(shù),即可實現(xiàn)分布式計算。
原理分析
Map階段執(zhí)行過程
- 框架會把輸入文件劃分為很多InputSplit,默認每個hdfs的block對應(yīng)一個InputSplit。通過RecordReader類,將每個InputSplit解析為一個個鍵值對<K1,V1>。默認每一個行會被解析成一個鍵值對。
- 框架會調(diào)用Mapper類中的map()函數(shù),map函數(shù)的形參是<k1,v1>,輸出是<k2,v2>。一個inputSplit對應(yīng)一個map task。
- 框架對map函數(shù)輸出的<k2,v2>進行分區(qū)。不同分區(qū)中的<k2,v2>由不同的reduce task處理,默認只有一個分區(qū)。
- 框架對每個分區(qū)中的數(shù)據(jù),按照k2進行排序、分組。分組指的是相同k2的v2分為一組。
- 在map節(jié)點,框架可以執(zhí)行reduce規(guī)約,此步驟為可選。
- 框架會把map task輸出的<k2,v2>寫入linux的磁盤文件
Reduce階段執(zhí)行過程
- 框架對多個map任務(wù)的輸出,按照不同的分區(qū),通過網(wǎng)絡(luò)copy到不同的reduce節(jié)點,這個過程稱為shuffle。
- 框架對reduce端接收到的相同分區(qū)的<k2,v2>數(shù)據(jù)進行合并、排序、分組
- 框架調(diào)用reduce類中的reduce方法,輸入<k2,[v2…]>,輸出<k3,v3>。一個<k2,[v2…]>調(diào)用一次reduce函數(shù)。
- 框架把reduce的輸出保存到hdfs。
WordCount案例分析
多文件WordCount案例分析
Shuffle過程詳解
shuffle是一個過程,貫穿map和reduce,通過網(wǎng)絡(luò)將map產(chǎn)生的數(shù)據(jù)放到reduce。
Map與Reduce的WordsCount案例(與日志查看)
引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.14</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.hx</groupId>
<artifactId>hadoopDemo1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hadoopDemo1</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
編碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author Huathy
* @date 2023-10-21 21:17
* @description 組裝任務(wù)
*/
public class WordCountJob {
public static void main(String[] args) throws Exception {
System.out.println("inputPath => " + args[0]);
System.out.println("outputPath => " + args[1]);
String path = args[0];
String path2 = args[1];
// job需要的配置參數(shù)
Configuration configuration = new Configuration();
// 創(chuàng)建job
Job job = Job.getInstance(configuration, "wordCountJob");
// 注意:這一行必須設(shè)置,否則在集群的時候?qū)o法找到Job類
job.setJarByClass(WordCountJob.class);
// 指定輸入文件
FileInputFormat.setInputPaths(job, new Path(path));
FileOutputFormat.setOutputPath(job, new Path(path2));
job.setMapperClass(WordMap.class);
job.setReducerClass(WordReduce.class);
// 指定map相關(guān)配置
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 指定reduce
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 提交任務(wù)
job.waitForCompletion(true);
}
/**
* @author Huathy
* @date 2023-10-21 21:39
* @description 創(chuàng)建自定義映射類
* 定義輸入輸出類型
*/
public static class WordMap extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* 需要實現(xiàn)map函數(shù)
* 這個map函數(shù)就是可以接受keyIn,valueIn,產(chǎn)生keyOut、ValueOut
*
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
// k1表示每行的行首偏移量,v1表示每一行的內(nèi)容
// 對獲取到的每一行數(shù)據(jù)進行切割,把單詞切割出來
String[] words = v1.toString().split("\W");
// 迭代切割的單詞數(shù)據(jù)
for (String word : words) {
// 將迭代的單詞封裝為<k2,v2>的形式
Text k2 = new Text(word);
System.out.println("k2: " + k2.toString());
LongWritable v2 = new LongWritable(1);
// 將<k2,v2>輸出
context.write(k2, v2);
}
}
}
/**
* @author Huathy
* @date 2023-10-21 22:08
* @description 自定義的reducer類
*/
public static class WordReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* 針對v2s的數(shù)據(jù)進行累加求和,并且把最終的數(shù)據(jù)轉(zhuǎn)為k3,v3輸出
*
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable v2 : v2s) {
sum += v2.get();
}
// 組裝K3,V3
LongWritable v3 = new LongWritable(sum);
System.out.println("k3: " + k2.toString() + " -- v3: " + v3.toString());
context.write(k2, v3);
}
}
}
運行命令與輸出日志
[root@cent7-1 hadoop-3.2.4]# hadoop jar wc.jar WordCountJob hdfs://cent7-1:9000/hello.txt hdfs://cent7-1:9000/out /home/hadoop-3.2.4/wc.jar
inputPath => hdfs://cent7-1:9000/hello.txt
outputPath => hdfs://cent7-1:9000/out
set jar => /home/hadoop-3.2.4/wc.jar
2023-10-22 15:30:34,183 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2023-10-22 15:30:35,183 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2023-10-22 15:30:35,342 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1697944187818_0010
2023-10-22 15:30:36,196 INFO input.FileInputFormat: Total input files to process : 1
2023-10-22 15:30:37,320 INFO mapreduce.JobSubmitter: number of splits:1
2023-10-22 15:30:37,694 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1697944187818_0010
2023-10-22 15:30:37,696 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-10-22 15:30:38,033 INFO conf.Configuration: resource-types.xml not found
2023-10-22 15:30:38,034 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-10-22 15:30:38,188 INFO impl.YarnClientImpl: Submitted application application_1697944187818_0010
2023-10-22 15:30:38,248 INFO mapreduce.Job: The url to track the job: http://cent7-1:8088/proxy/application_1697944187818_0010/
2023-10-22 15:30:38,249 INFO mapreduce.Job: Running job: job_1697944187818_0010
2023-10-22 15:30:51,749 INFO mapreduce.Job: Job job_1697944187818_0010 running in uber mode : false
2023-10-22 15:30:51,751 INFO mapreduce.Job: map 0% reduce 0%
2023-10-22 15:30:59,254 INFO mapreduce.Job: map 100% reduce 0%
2023-10-22 15:31:08,410 INFO mapreduce.Job: map 100% reduce 100%
2023-10-22 15:31:09,447 INFO mapreduce.Job: Job job_1697944187818_0010 completed successfully
2023-10-22 15:31:09,578 INFO mapreduce.Job: Counters: 54
File System Counters
FILE: Number of bytes read=129
FILE: Number of bytes written=479187
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=139
HDFS: Number of bytes written=35
HDFS: Number of read operations=8
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
HDFS: Number of bytes read erasure-coded=0
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=4916
Total time spent by all reduces in occupied slots (ms)=5821
Total time spent by all map tasks (ms)=4916
Total time spent by all reduce tasks (ms)=5821
Total vcore-milliseconds taken by all map tasks=4916
Total vcore-milliseconds taken by all reduce tasks=5821
Total megabyte-milliseconds taken by all map tasks=5033984
Total megabyte-milliseconds taken by all reduce tasks=5960704
Map-Reduce Framework
Map input records=4
Map output records=8
Map output bytes=107
Map output materialized bytes=129
Input split bytes=94
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=129
Reduce input records=8
Reduce output records=5
Spilled Records=16
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=259
CPU time spent (ms)=2990
Physical memory (bytes) snapshot=528863232
Virtual memory (bytes) snapshot=5158191104
Total committed heap usage (bytes)=378011648
Peak Map Physical memory (bytes)=325742592
Peak Map Virtual memory (bytes)=2575839232
Peak Reduce Physical memory (bytes)=203120640
Peak Reduce Virtual memory (bytes)=2582351872
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=45
File Output Format Counters
Bytes Written=35
[root@cent7-1 hadoop-3.2.4]#
MapReduce任務(wù)日志查看
- 開啟yarn日志聚合功能,將散落在nodemanager節(jié)點的日志統(tǒng)一收集管理,方便查看
- 修改yarn-site.xml中的yarn.log-aggregation-enable和yarn.log.server.url
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://cent7-1:19888/jobhistory/logs/</value>
</property>
- 啟動historyserver:
sbin/mr-jobhistory-daemon.sh start historyserver
UI界面查看
-
訪問 http://192.168.56.101:8088/cluster ,點擊History
-
點進Successful
-
看到成功記錄,點擊logs可以看到成功日志
停止Hadoop集群中的任務(wù)
Ctrl+C退出終端,并不會結(jié)束任務(wù),因為任務(wù)已經(jīng)提交到了Hadoop
- 查看任務(wù)列表:
yarn application -list
- 結(jié)束任務(wù)進程:
yarn application -kill [application_Id]
# 查看正在進行的任務(wù)列表
[root@cent7-1 hadoop-3.2.4]# yarn application -list
2023-10-22 16:18:38,756 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1697961350721_0002 wordCountJob MAPREDUCE root default ACCEPTED UNDEFINED 0% N/A
# 結(jié)束任務(wù)
[root@cent7-1 hadoop-3.2.4]# yarn application -kill application_1697961350721_0002
2023-10-22 16:18:55,669 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
Killing application application_1697961350721_0002
2023-10-22 16:18:56,795 INFO impl.YarnClientImpl: Killed application application_1697961350721_0002
Hadoop序列化機制
序列化機制作用
上面可以看出,Hadoop運行的時候大多數(shù)IO操作。我們在編寫Hadoop的Map和Reduce代碼的時候,用的都是Hadoop官方提供的數(shù)據(jù)類型,Hadoop官方對序列化做了優(yōu)化,只會序列化核心內(nèi)容來減少IO開銷。
Hadoop序列化機制的特點
- 緊湊:高效的使用存儲空間
- 快速:讀寫數(shù)據(jù)的額外開銷小
- 可擴展:可透明的讀取老格式的數(shù)據(jù)
- 互操作:支持多語言操作
Java序列化的不足
- 不夠精簡,附加信息多,不適合隨機訪問
- 存儲空間占用大,遞歸輸出類的父類描述,直到不再有父類
- 擴展性差,Hadoop中的Writable可以方便用戶自定義
資源管理器(Yarn)詳解
- Yarn目前支持三種調(diào)度器:(針對任務(wù)的調(diào)度器)
- FIFO Scheduler:先進先出調(diào)度策略(工作中存在實時任務(wù)和離線任務(wù),先進先出可能不太適合業(yè)務(wù))
- CapacityScheduler:可以看作是FIFO的多隊列版本??梢苑殖啥鄠€隊列,每個隊列里面是先進先出的。
- FairScheduler:多隊列,多用戶共享資源。公平任務(wù)調(diào)度(建議使用)。
文章來源:http://www.zghlxwxcb.cn/news/detail-718226.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-718226.html
到了這里,關(guān)于Hadoop3.0大數(shù)據(jù)處理學(xué)習(xí)3(MapReduce原理分析、日志歸集、序列化機制、Yarn資源調(diào)度器)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!