Flink學(xué)習(xí)筆記
前言:今天是學(xué)習(xí)flink的第四天啦!學(xué)習(xí)了物理分區(qū)的知識點,這一次學(xué)習(xí)了前4個簡單的物理分區(qū),稱之為簡單分區(qū)篇!
Tips:我相信自己會越來會好的,明天攻克困難分區(qū)篇,加油!
二、Flink 流批一體 API 開發(fā)
3. 物理分區(qū)
3.1 Global Partitioner
該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subtask id = 0)
實例:編寫Flink程序,接收socket的單詞數(shù)據(jù),以進(jìn)程標(biāo)記查看分區(qū)數(shù)據(jù)情況。
package cn.itcast.day04.partition;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-15 22:54:35
* @description TODO
*/
public class GlobalPartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一個非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
// 對每個輸入的數(shù)據(jù)進(jìn)行映射處理,給每個單詞添加上一個字符串以及當(dāng)前所在的子任務(wù)編號
SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " : " + indexOfThisSubtask;
}
}).setParallelism(3); // 針對算子將并行度設(shè)置為 3;
// 對數(shù)據(jù)流進(jìn)行 global,將其隨機均勻地劃分到每個分區(qū)中
DataStream<String> global = mapped.global();
// 定義一個 sink 函數(shù),輸出每個單詞和所在的子任務(wù)編號
global.addSink(new RichSinkFunction<String>(){
@Override
public void invoke(String value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + "->" + index);
}
});
env.execute();
}
}
結(jié)果:
hadoop : 1->0
hadoop : 2->0
hadoop : 0->0
spark : 1->0
spark : 2->0
總結(jié):
- 1- 多個進(jìn)程處理的數(shù)據(jù),匯總到 sink 第一個分區(qū)第一個進(jìn)程
- 2- 數(shù)據(jù)多出梳理,合并一處的現(xiàn)象
- 3- getRuntimeContext()方法在 Rich Function 中,最后的 addSink()用心良苦!
- 4- 并行任務(wù)之間共享相同狀態(tài)的場景,如全局計數(shù)器等
3.2 Shuffer Partition
根據(jù)均勻分布隨機劃分元素。
實例:編寫Flink程序,接收socket的單詞數(shù)據(jù),并將每個字符串均勻的隨機劃分到每個分區(qū)。
package cn.itcast.day04.partition;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-15 23:26:49
* @description TODO:編寫Flink程序,接收socket的單詞數(shù)據(jù),并將每個字符串均勻的隨機劃分到每個分區(qū)
*/
public class ShufflePartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一個非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//并行度2
SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " : " + indexOfThisSubtask;
}
}).setParallelism(1);
//shuffle?。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。?/span>
DataStream<String> shuffled = mapped.shuffle();
shuffled.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + " -> " + index);
}
});
env.execute();
}
}
結(jié)果:
結(jié)果現(xiàn)象:(沒有規(guī)律)
hadoop : 0 -> 0
hadoop : 0 -> 4
flink : 0 -> 6
flink : 0 -> 7
總結(jié):
- 1- 它將數(shù)據(jù)均勻地分配到下游任務(wù)的每個并行實例中,然后再對每個并行任務(wù)的數(shù)據(jù)進(jìn)行分區(qū)
- 2- 這種分發(fā)方式適用于數(shù)據(jù)量比較大的場景,可以減少網(wǎng)絡(luò)傳輸壓力和降低數(shù)據(jù)傾斜的概率。
3.3 Broadcast Partition
發(fā)送到下游所有的算子實例
實例:編寫Flink程序,接收socket的單詞數(shù)據(jù),并將每個字符串廣播到每個分區(qū)。
package cn.itcast.day04.partition;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-15 23:35:59
* @description TODO
*/
public class BroadcastPartitioningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//Source是一個非并行的Source
//并行度是1
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//并行度2
SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return value + " : " + indexOfThisSubtask;
}
}).setParallelism(1);
//廣播,上游的算子將一個數(shù)據(jù)廣播到下游所以的subtask
DataStream<String> shuffled = mapped.broadcast();
shuffled.addSink(new RichSinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
int index = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(value + " -> " + index);
}
});
env.execute();
}
}
結(jié)果:
hadoop : 0 -> 0
hadoop : 0 -> 2
hadoop : 0 -> 1
hadoop : 0 -> 3
hadoop : 0 -> 4
hadoop : 0 -> 6
hadoop : 0 -> 5
hadoop : 0 -> 7
spark : 0 -> 3
spark : 0 -> 2
spark : 0 -> 6
spark : 0 -> 4
spark : 0 -> 0
spark : 0 -> 1
spark : 0 -> 5
spark : 0 -> 7
總結(jié):
- 均勻廣播數(shù)據(jù)
3.4 Rebalance Partition
通過循環(huán)的方式依次發(fā)送到下游的task
實例:輪詢發(fā)送數(shù)據(jù)
package cn.itcast.day04.partition;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-15 23:41:46
* @description TODO: flink的數(shù)據(jù)傾斜解決方案:輪詢發(fā)送(當(dāng)設(shè)置并行度為1時)
*/
public class RebalanceDemo {
public static void main(String[] args) throws Exception {
/**
* ?構(gòu)建批處理運行環(huán)境
* ?使用 env.generateSequence 創(chuàng)建0-100的并行數(shù)據(jù)
* ?使用 fiter 過濾出來 大于8 的數(shù)字
* ?使用map操作傳入 RichMapFunction ,將當(dāng)前子任務(wù)的ID和數(shù)字構(gòu)建成一個元組
* ?在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 獲取子任務(wù)序號
* ?打印測試
*/
//TODO ?構(gòu)建批處理運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO ?使用 env.generateSequence 創(chuàng)建0-100的并行數(shù)據(jù)
DataStream<Long> dataSource = env.generateSequence(0, 100);
//TODO ?使用 fiter 過濾出來 大于8 的數(shù)字
SingleOutputStreamOperator<Long> filteredDataSource = dataSource.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return aLong > 8;
}
});
//解決數(shù)據(jù)傾斜的問題
DataStream<Long> rebalance = filteredDataSource.rebalance();
//TODO ?使用map操作傳入 RichMapFunction ,將當(dāng)前子任務(wù)的ID和數(shù)字構(gòu)建成一個元組
//查看92條數(shù)據(jù)分別被哪些線程處理的,可以看到每個線程處理的數(shù)據(jù)條數(shù)
//spark中查看數(shù)據(jù)屬于哪個分區(qū)使用哪個函數(shù)?mapPartitionWithIndex
//TODO ?在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 獲取子任務(wù)序號
SingleOutputStreamOperator<Tuple2<Long, Integer>> tuple2MapOperator = rebalance.map(new RichMapFunction<Long, Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> map(Long aLong) throws Exception {
return Tuple2.of(aLong, getRuntimeContext().getIndexOfThisSubtask());
}
});
//TODO ?打印測試
tuple2MapOperator.print();
env.execute();
}
}
結(jié)果:文章來源:http://www.zghlxwxcb.cn/news/detail-825254.html
* 0-0
* 0-1
* 0-2
* 0-0
* 0-1
* 0-2
總結(jié):文章來源地址http://www.zghlxwxcb.cn/news/detail-825254.html
- 1- 輪詢發(fā)送數(shù)據(jù)
- 2- 解決數(shù)據(jù)傾斜問題
到了這里,關(guān)于flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!