国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)

這篇具有很好參考價值的文章主要介紹了flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink學(xué)習(xí)筆記

前言:今天是學(xué)習(xí)flink的第四天啦!學(xué)習(xí)了物理分區(qū)的知識點,這一次學(xué)習(xí)了前4個簡單的物理分區(qū),稱之為簡單分區(qū)篇!
Tips:我相信自己會越來會好的,明天攻克困難分區(qū)篇,加油!

二、Flink 流批一體 API 開發(fā)

3. 物理分區(qū)

3.1 Global Partitioner

該分區(qū)器會將所有的數(shù)據(jù)都發(fā)送到下游的某個算子實例(subtask id = 0)

flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上),Flink重溫筆記,flink,筆記,大數(shù)據(jù),學(xué)習(xí)方法,數(shù)據(jù)倉庫

實例:編寫Flink程序,接收socket的單詞數(shù)據(jù),以進(jìn)程標(biāo)記查看分區(qū)數(shù)據(jù)情況。

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-15 22:54:35
 * @description TODO
 */
public class GlobalPartitioningDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一個非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // 對每個輸入的數(shù)據(jù)進(jìn)行映射處理,給每個單詞添加上一個字符串以及當(dāng)前所在的子任務(wù)編號
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(3); // 針對算子將并行度設(shè)置為 3;

        // 對數(shù)據(jù)流進(jìn)行 global,將其隨機均勻地劃分到每個分區(qū)中
        DataStream<String> global = mapped.global();

        // 定義一個 sink 函數(shù),輸出每個單詞和所在的子任務(wù)編號
        global.addSink(new RichSinkFunction<String>(){
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + "->" + index);
            }
        });

        env.execute();
    }
}

結(jié)果:

hadoop : 1->0
hadoop : 2->0
hadoop : 0->0
spark : 1->0
spark : 2->0

總結(jié):

  • 1- 多個進(jìn)程處理的數(shù)據(jù),匯總到 sink 第一個分區(qū)第一個進(jìn)程
  • 2- 數(shù)據(jù)多出梳理,合并一處的現(xiàn)象
  • 3- getRuntimeContext()方法在 Rich Function 中,最后的 addSink()用心良苦!
  • 4- 并行任務(wù)之間共享相同狀態(tài)的場景,如全局計數(shù)器等
3.2 Shuffer Partition

根據(jù)均勻分布隨機劃分元素。

flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上),Flink重溫筆記,flink,筆記,大數(shù)據(jù),學(xué)習(xí)方法,數(shù)據(jù)倉庫

實例:編寫Flink程序,接收socket的單詞數(shù)據(jù),并將每個字符串均勻的隨機劃分到每個分區(qū)。

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-15 23:26:49
 * @description TODO:編寫Flink程序,接收socket的單詞數(shù)據(jù),并將每個字符串均勻的隨機劃分到每個分區(qū)
 */
public class ShufflePartitioningDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一個非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //并行度2
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //shuffle?。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。?/span>
        DataStream<String> shuffled = mapped.shuffle();

        shuffled.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });
        env.execute();
    }
}

結(jié)果:

結(jié)果現(xiàn)象:(沒有規(guī)律)
           hadoop : 0 -> 0
           hadoop : 0 -> 4
           flink  : 0 -> 6
           flink  : 0 -> 7

總結(jié):

  • 1- 它將數(shù)據(jù)均勻地分配到下游任務(wù)的每個并行實例中,然后再對每個并行任務(wù)的數(shù)據(jù)進(jìn)行分區(qū)
  • 2- 這種分發(fā)方式適用于數(shù)據(jù)量比較大的場景,可以減少網(wǎng)絡(luò)傳輸壓力和降低數(shù)據(jù)傾斜的概率。
3.3 Broadcast Partition

發(fā)送到下游所有的算子實例

flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上),Flink重溫筆記,flink,筆記,大數(shù)據(jù),學(xué)習(xí)方法,數(shù)據(jù)倉庫

實例:編寫Flink程序,接收socket的單詞數(shù)據(jù),并將每個字符串廣播到每個分區(qū)。

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-15 23:35:59
 * @description TODO
 */
public class BroadcastPartitioningDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //Source是一個非并行的Source
        //并行度是1
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //并行度2
        SingleOutputStreamOperator<String> mapped = lines.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                return value + " : " + indexOfThisSubtask;
            }
        }).setParallelism(1);

        //廣播,上游的算子將一個數(shù)據(jù)廣播到下游所以的subtask
        DataStream<String> shuffled = mapped.broadcast();

        shuffled.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                int index = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(value + " -> " + index);
            }
        });

        env.execute();
    }
}

結(jié)果:

hadoop : 0 -> 0
hadoop : 0 -> 2
hadoop : 0 -> 1
hadoop : 0 -> 3
hadoop : 0 -> 4
hadoop : 0 -> 6
hadoop : 0 -> 5
hadoop : 0 -> 7
spark : 0 -> 3
spark : 0 -> 2
spark : 0 -> 6
spark : 0 -> 4
spark : 0 -> 0
spark : 0 -> 1
spark : 0 -> 5
spark : 0 -> 7

總結(jié):

  • 均勻廣播數(shù)據(jù)
3.4 Rebalance Partition

通過循環(huán)的方式依次發(fā)送到下游的task

flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上),Flink重溫筆記,flink,筆記,大數(shù)據(jù),學(xué)習(xí)方法,數(shù)據(jù)倉庫

實例:輪詢發(fā)送數(shù)據(jù)

package cn.itcast.day04.partition;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-15 23:41:46
 * @description TODO: flink的數(shù)據(jù)傾斜解決方案:輪詢發(fā)送(當(dāng)設(shè)置并行度為1時)
 */
public class RebalanceDemo {
    public static void main(String[] args) throws Exception {
        /**
         * ?構(gòu)建批處理運行環(huán)境
         * ?使用 env.generateSequence 創(chuàng)建0-100的并行數(shù)據(jù)
         * ?使用 fiter 過濾出來 大于8 的數(shù)字
         * ?使用map操作傳入 RichMapFunction ,將當(dāng)前子任務(wù)的ID和數(shù)字構(gòu)建成一個元組
         * ?在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 獲取子任務(wù)序號
         * ?打印測試
         */
        //TODO ?構(gòu)建批處理運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO ?使用 env.generateSequence 創(chuàng)建0-100的并行數(shù)據(jù)
        DataStream<Long> dataSource = env.generateSequence(0, 100);

        //TODO ?使用 fiter 過濾出來 大于8 的數(shù)字
        SingleOutputStreamOperator<Long> filteredDataSource = dataSource.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long aLong) throws Exception {
                return aLong > 8;
            }
        });

        //解決數(shù)據(jù)傾斜的問題
        DataStream<Long> rebalance = filteredDataSource.rebalance();

        //TODO ?使用map操作傳入 RichMapFunction ,將當(dāng)前子任務(wù)的ID和數(shù)字構(gòu)建成一個元組
        //查看92條數(shù)據(jù)分別被哪些線程處理的,可以看到每個線程處理的數(shù)據(jù)條數(shù)
        //spark中查看數(shù)據(jù)屬于哪個分區(qū)使用哪個函數(shù)?mapPartitionWithIndex
        //TODO ?在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 獲取子任務(wù)序號
        SingleOutputStreamOperator<Tuple2<Long, Integer>> tuple2MapOperator = rebalance.map(new RichMapFunction<Long, Tuple2<Long, Integer>>() {
            @Override
            public Tuple2<Long, Integer> map(Long aLong) throws Exception {
                return Tuple2.of(aLong, getRuntimeContext().getIndexOfThisSubtask());
            }
        });

        //TODO ?打印測試
        tuple2MapOperator.print();

        env.execute();
    }
}

結(jié)果:

 * 0-0
 * 0-1
 * 0-2
 * 0-0
 * 0-1
 * 0-2

總結(jié):文章來源地址http://www.zghlxwxcb.cn/news/detail-825254.html

  • 1- 輪詢發(fā)送數(shù)據(jù)
  • 2- 解決數(shù)據(jù)傾斜問題

到了這里,關(guān)于flink重溫筆記(四):Flink 流批一體 API 開發(fā)——物理分區(qū)(上)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink流批一體計算(16):PyFlink DataStream API

    Flink流批一體計算(16):PyFlink DataStream API

    目錄 概述 Pipeline Dataflow 代碼示例WorldCount.py 執(zhí)行腳本W(wǎng)orldCount.py 概述 Apache Flink 提供了 DataStream API,用于構(gòu)建健壯的、有狀態(tài)的流式應(yīng)用程序。它提供了對狀態(tài)和時間細(xì)粒度控制,從而允許實現(xiàn)高級事件驅(qū)動系統(tǒng)。 用戶實現(xiàn)的Flink程序是由Stream和Transformation這兩個基本構(gòu)建塊組

    2024年02月11日
    瀏覽(25)
  • Flink流批一體計算(10):PyFlink Tabel API

    簡述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它構(gòu)建可擴(kuò)展的批處理和流處理任務(wù),例如實時數(shù)據(jù)處理管道、大規(guī)模探索性數(shù)據(jù)分析、機器學(xué)習(xí)( ML )管道和 ETL 處理。 如果你對 Python 和 Pandas 等庫已經(jīng)比較熟悉,那么 PyFlink 可以讓你更輕松地利用 Flink 生態(tài)系統(tǒng)的全部功

    2024年02月11日
    瀏覽(27)
  • Flink流批一體計算(20):DataStream API和Table API互轉(zhuǎn)

    目錄 舉個例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴(kuò)展了TableEnvironment。 下面代碼演示兩種API如何互轉(zhuǎn) TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目錄 StreamExecutionEnvironment Watermark watermark策略簡介 使用 Watermark 策略 內(nèi)置水印生成器 處理空閑數(shù)據(jù)源 算子處理 Watermark 的方式 創(chuàng)建DataStream的方式 通過list對象創(chuàng)建 ??????使用DataStream connectors創(chuàng)建 使用Table SQL connectors創(chuàng)建 StreamExecutionEnvironment 編寫一個 Flink Python DataSt

    2024年02月11日
    瀏覽(55)
  • Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment

    目錄 概述 設(shè)置重啟策略 什么是flink的重啟策略(Restartstrategy) flink的重啟策略(Restartstrategy)實戰(zhàn) flink的4種重啟策略 FixedDelayRestartstrategy(固定延時重啟策略) FailureRateRestartstrategy(故障率重啟策略) NoRestartstrategy(不重啟策略) 配置State Backends 以及 Checkpointing Checkpoint 啟用和配置

    2024年02月13日
    瀏覽(47)
  • Flink流批一體計算(12):PyFlink Tabel API之構(gòu)建作業(yè)

    目錄 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通過 DDL 語句來注冊源表和結(jié)果表 2. 創(chuàng)建一個作業(yè) 3. 提交作業(yè)Submitting PyFlink Jobs 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    瀏覽(21)
  • Flink流批一體計算(19):PyFlink DataStream API之State

    目錄 keyed state Keyed DataStream 使用 Keyed State 實現(xiàn)了一個簡單的計數(shù)窗口 狀態(tài)有效期 (TTL) 過期數(shù)據(jù)的清理 全量快照時進(jìn)行清理 增量數(shù)據(jù)清理 在 RocksDB 壓縮時清理 Operator State算子狀態(tài) Broadcast State廣播狀態(tài) keyed state Keyed DataStream 使用 keyed state,首先需要為DataStream指定 key(主鍵)

    2024年02月10日
    瀏覽(43)
  • Flink流批一體計算(14):PyFlink Tabel API之SQL查詢

    舉個例子 查詢 source 表,同時執(zhí)行計算 Table API 查詢 Table 對象有許多方法,可以用于進(jìn)行關(guān)系操作。 這些方法返回新的 Table 對象,表示對輸入 Table 應(yīng)用關(guān)系操作之后的結(jié)果。 這些關(guān)系操作可以由多個方法調(diào)用組成,例如 table.group_by(...).select(...)。 Table API 文檔描述了流和批

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 創(chuàng)建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用來: ·創(chuàng)建 Table ·將 Table 注冊成臨時表 ·執(zhí)行 SQL 查詢 ·注冊用戶自定義的 (標(biāo)量,表值,或者聚合) 函數(shù) ·配置作業(yè) ·管理 Python 依賴 ·提交作業(yè)執(zhí)行 創(chuàng)建 source 表 創(chuàng)建 sink

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    目錄 1. 在上節(jié)數(shù)據(jù)流上執(zhí)行轉(zhuǎn)換操作,或者使用 sink 將數(shù)據(jù)寫入外部系統(tǒng)。 2. File Sink File Sink Format Types? Row-encoded Formats? Bulk-encoded Formats? 桶分配 滾動策略 3. 如何輸出結(jié)果 Print 集合數(shù)據(jù)到客戶端,execute_and_collect方法將收集數(shù)據(jù)到客戶端內(nèi)存 將結(jié)果發(fā)送到DataStream sink conne

    2024年02月11日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包