- 尚硅谷大數(shù)據(jù)技術-教程-學習路線-筆記匯總表【課程資料下載】
- 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程從入門到精通_嗶哩嗶哩_bilibili
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記03【Flink運行時架構】
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記04【】
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記05【】
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記06【】
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記07【】
- 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記08【】
目錄
基礎篇
第04章-Flink部署
P023【023_Flink運行時架構_系統(tǒng)架構】07:13
P024【024_Flink運行時架構_核心概念_并行度】06:45
P025【025_Flink運行時架構_核心概念_并行度設置&優(yōu)先級】18:40
P026【026_Flink運行時架構_核心概念_算子鏈】08:34
P027【027_Flink運行時架構_核心概念_算子鏈演示】17:11
P028【028_Flink運行時架構_核心概念_任務槽】09:52
P029【029_Flink運行時架構_核心概念_任務槽的共享組】07:59
P030【030_Flink運行時架構_核心概念_slot與并行度的關系&演示】21:27
P031【031_Flink運行時架構_提交流程_Standalone會話模式&四張圖】09:49
P032【032_Flink運行時架構_提交流程_Yarn應用模式】05:18
基礎篇
第04章-Flink部署
P023【023_Flink運行時架構_系統(tǒng)架構】07:13
Flink運行時架構——Standalone會話模式為例
P024【024_Flink運行時架構_核心概念_并行度】06:45
- 一個特定算子的子任務(subtask)的個數(shù)被稱之為其并行度(parallelism)。這樣,包含并行子任務的數(shù)據(jù)流,就是并行數(shù)據(jù)流,它需要多個分區(qū)(stream partition)來分配并行任務。一般情況下,一個流程序的并行度,可以認為就是其所有算子中最大的并行度。一個程序中,不同的算子可能具有不同的并行度。
- 例如:如上圖所示,當前數(shù)據(jù)流中有source、map、window、sink四個算子,其中sink算子的并行度為1,其他算子的并行度都為2。所以這段流處理程序的并行度就是2。
P025【025_Flink運行時架構_核心概念_并行度設置&優(yōu)先級】18:40
4.2.1 并行度(Parallelism)
2)并行度的設置
在Flink中,可以用不同的方法來設置并行度,它們的有效范圍和優(yōu)先級別也是不同的。
(1)代碼中設置
我們在代碼中,可以很簡單地在算子后跟著調(diào)用setParallelism()方法,來設置當前算子的并行度:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
這種方式設置的并行度,只針對當前算子有效。
另外,我們也可以直接調(diào)用執(zhí)行環(huán)境的setParallelism()方法,全局設定并行度:
env.setParallelism(2);
這樣代碼中所有算子,默認的并行度就都為2了。我們一般不會在程序中設置全局并行度,因為如果在程序中對全局并行度進行硬編碼,會導致無法動態(tài)擴容。
這里要注意的是,由于keyBy不是算子,所以無法對keyBy設置并行度。
(2)提交應用時設置
在使用flink run命令提交應用時,可以增加-p參數(shù)來指定當前應用程序執(zhí)行的并行度,它的作用類似于執(zhí)行環(huán)境的全局設置:
bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount
./FlinkTutorial-1.0-SNAPSHOT.jar
如果我們直接在Web UI上提交作業(yè),也可以在對應輸入框中直接添加并行度。
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* TODO DataStream實現(xiàn)Wordcount:讀socket(無界流)
*
* @author
* @version 1.0
*/
public class WordCountStreamUnboundedDemo {
public static void main(String[] args) throws Exception {
// TODO 1.創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// IDEA運行時,也可以看到webui,一般用于本地測試
// 需要引入一個依賴 flink-runtime-web
// 在idea運行,不指定并行度,默認就是 電腦的 線程數(shù)
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(3);
// TODO 2.讀取數(shù)據(jù): socket
DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);
// TODO 3.處理數(shù)據(jù): 切換、轉換、分組、聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.setParallelism(2)
.returns(Types.TUPLE(Types.STRING,Types.INT))
// .returns(new TypeHint<Tuple2<String, Integer>>() {})
.keyBy(value -> value.f0)
.sum(1);
// TODO 4.輸出
sum.print();
// TODO 5.執(zhí)行
env.execute();
}
}
/**
并行度的優(yōu)先級:
代碼:算子 > 代碼:env > 提交時指定 > 配置文件
*/
并行度優(yōu)先級:代碼:算子 > 代碼:全局env > 提交時指定命令 > 配置文件。
P026【026_Flink運行時架構_核心概念_算子鏈】08:34
4.2.2 算子鏈(Operator Chain)
2)合并算子鏈
在Flink中,并行度相同的一對一(one to one)算子操作,可以直接鏈接在一起形成一個“大”的任務(task),這樣原來的算子就成為了真正任務里的一部分,如下圖所示。每個task會被一個線程執(zhí)行。這樣的技術被稱為“算子鏈”(Operator Chain)。
P027【027_Flink運行時架構_核心概念_算子鏈演示】17:11
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* TODO DataStream實現(xiàn)Wordcount:讀socket(無界流)
*
* @author
* @version 1.0
*/
public class OperatorChainDemo {
public static void main(String[] args) throws Exception {
// TODO 1.創(chuàng)建執(zhí)行環(huán)境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// IDEA運行時,也可以看到webui,一般用于本地測試
// 需要引入一個依賴 flink-runtime-web
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 在idea運行,不指定并行度,默認就是 電腦的 線程數(shù)
env.setParallelism(1);
// 全局禁用 算子鏈
//env.disableOperatorChaining();
// TODO 2.讀取數(shù)據(jù):socket
DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);
// TODO 3.處理數(shù)據(jù): 切換、轉換、分組、聚合
SingleOutputStreamOperator<Tuple2<String,Integer>> sum = socketDS
//.disableChaining()
.flatMap(
(String value, Collector<String> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
)
.startNewChain()
//.disableChaining()
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
// TODO 4.輸出
sum.print();
// TODO 5.執(zhí)行
env.execute();
}
}
/**
1、算子之間的傳輸關系:
一對一
重分區(qū)
2、算子 串在一起的條件:
1) 一對一
2) 并行度相同
3、關于算子鏈的api:
1)全局禁用算子鏈:env.disableOperatorChaining();
2)某個算子不參與鏈化: 算子A.disableChaining(), 算子A不會與 前面 和 后面的算子 串在一起
3)從某個算子開啟新鏈條: 算子A.startNewChain(), 算子A不與 前面串在一起,從A開始正常鏈化
*/
P028【028_Flink運行時架構_核心概念_任務槽】09:52
4.2.3 任務槽(Task Slots)
P029【029_Flink運行時架構_核心概念_任務槽的共享組】07:59
3)任務對任務槽的共享
默認情況下,F(xiàn)link是允許子任務共享slot的。如果我們保持sink任務并行度為1不變,而作業(yè)提交時設置全局并行度為6,那么前兩個任務節(jié)點就會各自有6個并行子任務,整個流處理程序則有13個子任務。如上圖所示,只要屬于同一個作業(yè),那么對于不同任務節(jié)點(算子)的并行子任務,就可以放到同一個slot上執(zhí)行。所以對于第一個任務節(jié)點source→map,它的6個并行子任務必須分到不同的slot上,而第二個任務節(jié)點keyBy/window/apply的并行子任務卻可以和第一個任務節(jié)點共享slot。
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* TODO DataStream實現(xiàn)Wordcount:讀socket(無界流)
*
* @author
* @version 1.0
*/
public class SlotSharingGroupDemo {
public static void main(String[] args) throws Exception {
// TODO 1.創(chuàng)建執(zhí)行環(huán)境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// IDEA運行時,也可以看到webui,一般用于本地測試
// 需要引入一個依賴 flink-runtime-web
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// 在idea運行,不指定并行度,默認就是 電腦的 線程數(shù)
env.setParallelism(1);
// TODO 2.讀取數(shù)據(jù):socket
DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);
// TODO 3.處理數(shù)據(jù): 切換、轉換、分組、聚合
SingleOutputStreamOperator<Tuple2<String,Integer>> sum = socketDS
.flatMap(
(String value, Collector<String> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
)
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1)).slotSharingGroup("aaa")
.returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
// TODO 4.輸出
sum.print();
// TODO 5.執(zhí)行
env.execute();
}
}
/**
1、slot特點:
1)均分隔離內(nèi)存,不隔離cpu
2)可以共享:
同一個job中,不同算子的子任務 才可以共享 同一個slot,同時在運行的
前提是,屬于同一個 slot共享組,默認都是“default”
2、slot數(shù)量 與 并行度 的關系
1)slot是一種靜態(tài)的概念,表示最大的并發(fā)上限
并行度是一種動態(tài)的概念,表示 實際運行 占用了 幾個
2)要求: slot數(shù)量 >= job并行度(算子最大并行度),job才能運行
TODO 注意:如果是yarn模式,動態(tài)申請
--> TODO 申請的TM數(shù)量 = job并行度 / 每個TM的slot數(shù),向上取整
比如session: 一開始 0個TaskManager,0個slot
--> 提交一個job,并行度10
--> 10/3,向上取整,申請4個tm,
--> 使用10個slot,剩余2個slot
*/
P030【030_Flink運行時架構_核心概念_slot與并行度的關系&演示】21:27
4.2.4 任務槽和并行度的關系
任務槽和并行度都跟程序的并行執(zhí)行有關,但兩者是完全不同的概念。簡單來說任務槽是靜態(tài)的概念,是指TaskManager具有的并發(fā)執(zhí)行能力,可以通過參數(shù)taskmanager.numberOfTaskSlots進行配置;而并行度是動態(tài)概念,也就是TaskManager運行程序時實際使用的并發(fā)能力,可以通過參數(shù)parallelism.default進行配置。
slot數(shù)量 與 并行度 的關系
? ? 1)slot是一種靜態(tài)的概念,表示最大的并發(fā)上限
? ? ? ?并行度是一種動態(tài)的概念,表示 實際運行 占用了 幾個? ? 2)要求: slot數(shù)量 >= job并行度(算子最大并行度),job才能運行
? ? ? ?TODO 注意:如果是yarn模式,動態(tài)申請
? ? ? ? ?--> TODO 申請的TM數(shù)量 = job并行度 / 每個TM的slot數(shù),向上取整
? ? ? ?比如session: 一開始 0個TaskManager,0個slot
? ? ? ? ?--> 提交一個job,并行度10
? ? ? ? ? ? --> 10/3,向上取整,申請4個tm
? ? ? ? ? ? --> 使用10個slot,剩余2個slot
P031【031_Flink運行時架構_提交流程_Standalone會話模式&四張圖】09:49
4.3 作業(yè)提交流程
4.3.1 Standalone會話模式作業(yè)提交流程
4.3.2 邏輯流圖/作業(yè)圖/執(zhí)行圖/物理流圖
邏輯流圖(StreamGraph)→ 作業(yè)圖(JobGraph)→ 執(zhí)行圖(ExecutionGraph)→ 物理圖(Physical Graph)。
P032【032_Flink運行時架構_提交流程_Yarn應用模式】05:18
4.3.3 Yarn應用模式作業(yè)提交流程文章來源:http://www.zghlxwxcb.cn/news/detail-596961.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-596961.html
到了這里,關于尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記03【Flink運行時架構】的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!