国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink多流轉(zhuǎn)換(1)—— 分流&合流

這篇具有很好參考價值的文章主要介紹了Flink多流轉(zhuǎn)換(1)—— 分流&合流。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

分流

代碼示例

使用側(cè)輸出流

合流

聯(lián)合(Union)

連接(Connect)


簡單劃分的話,多流轉(zhuǎn)換可以分為“分流”和“合流”兩大類

目前分流的操作一般是通過側(cè)輸出流(side output)來實現(xiàn),而合流的算子比較豐富,根據(jù)不同的需求可以調(diào)用 union、connect、join 以及 coGroup 等接口進行連接合并操作

分流

將一條數(shù)據(jù)流拆分成完全獨立的兩條、甚至多條流。也就是基于一個DataStream,得到完全平等的多個子DataStream

Flink多流轉(zhuǎn)換(1)—— 分流&合流,Flink,大數(shù)據(jù),flink,java,python,大數(shù)據(jù)

代碼示例

調(diào)用.filter()方法進行篩選,將符合條件的數(shù)據(jù)揀選出來放到對應(yīng)的流里

public class SplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());
        
        // 篩選Mary的瀏覽行為放入MaryStream流中
        DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Mary");
            }
        });
        
        // 篩選Bob的購買行為放入BobStream流中
        DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Bob");
            }
        });
        
        // 篩選其他人的瀏覽行為放入elseStream流中
        DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return !value.user.equals("Mary") && !value.user.equals("Bob") ;
            }
        });

        MaryStream.print("Mary pv");
        BobStream.print("Bob pv");
        elseStream.print("else pv");
        
        env.execute();
    }
}

缺點:上述操作將原始流復(fù)制了三份,對每一份分別進行篩選,因此代碼冗余,不夠高效

解決:①.split()方法(但限制了數(shù)據(jù)類型轉(zhuǎn)換,已經(jīng)廢棄)

②測輸出流

使用側(cè)輸出流

改進后的代碼如下:

public class SplitStreamByOutputTag {
    // 定義輸出標(biāo)簽,側(cè)輸出流的數(shù)據(jù)類型為三元組(user, url, timestamp)
    private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};
    private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")){
                    ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else if (value.user.equals("Bob")){
                    ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else {
                    out.collect(value);
                }
            }
        });

        processedStream.getSideOutput(MaryTag).print("Mary pv");
        processedStream.getSideOutput(BobTag).print("Bob pv");
        processedStream.print("else");

        env.execute();
    }
}

①定義OutputTag作為標(biāo)簽

②使用ctx.output方法將符合篩選條件的數(shù)據(jù)寫入側(cè)輸出流

③使用getSideOutput方法從側(cè)輸出流中獲得數(shù)據(jù)

合流

對于來源不同的多條流中的數(shù)據(jù)進行聯(lián)合處理(與分流相比,合流操作更為普遍)

聯(lián)合(Union)

直接將多條流合在一起(要求必須流中的數(shù)據(jù)類型必須相同),合并之后的新流會包括所有流中的元素,數(shù)據(jù)類型不變

Flink多流轉(zhuǎn)換(1)—— 分流&合流,Flink,大數(shù)據(jù),flink,java,python,大數(shù)據(jù)

操作:基于 DataStream 直接調(diào)用.union()方法

參數(shù):其他 DataStream

返回值:一個 DataStream

stream1.union(stream2, stream3, ...)

水位線時效性:多流合并時處理的時效性是以最慢的那個流為準(zhǔn)的(多條流的合并,某種意義上也可以看作是多個并行任務(wù)向同一個下游任務(wù)匯合的過程)

代碼示例:

public class UnionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);



        SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777)
                .map(data -> {
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        stream1.print("stream1");

        SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777)
                .map(data -> {
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        stream2.print("stream2");

        // 合并兩條流
        stream1.union(stream2)
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("水位線:" + ctx.timerService().currentWatermark());
                    }
                })
                .print();


        env.execute();
    }
}

測試:

分別在兩臺機器上輸入以下數(shù)據(jù):

hadoop102 :Alice, ./home, 1000
hadoop103 :Alice, ./home, 2000
hadoop102 :Alice, ./home, 3000

水位線的推進如下:

Flink多流轉(zhuǎn)換(1)—— 分流&合流,Flink,大數(shù)據(jù),flink,java,python,大數(shù)據(jù)

連接(Connect)

連接操作允許流的數(shù)據(jù)類型不同

連接流(ConnectedStreams)

連接流可以看成是兩條流形式上的“統(tǒng)一”,被放在了一個同一個流中;事實上內(nèi)部仍保持各自的數(shù)據(jù)形式不變,彼此之間是相互獨立的

要想得到新的 DataStream,還需要進一步定義一個“同處理”(co-process)轉(zhuǎn)換操作,用來說明對于不同來源、不同類型的數(shù)據(jù),怎樣分別進行處理轉(zhuǎn)換、得到統(tǒng)一的輸出類型

Flink多流轉(zhuǎn)換(1)—— 分流&合流,Flink,大數(shù)據(jù),flink,java,python,大數(shù)據(jù)

代碼實現(xiàn)

①基于一條 DataStream 調(diào)用.connect()方法,傳入另外一條 DataStream 作為參數(shù),將兩條流連接起來,得到一個 ConnectedStreams

②調(diào)用同處理方法得到 DataStream(可以的調(diào)用的同處理方法有.map()/.flatMap(),以及.process()方法)

public class ConnectTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Integer> stream1 = env.fromElements(1,2,3);
        DataStream<Long> stream2 = env.fromElements(1L,2L,3L);

        ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
        SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
            @Override
            public String map1(Integer value) {
                return "Integer: " + value;
            }

            @Override
            public String map2(Long value) {
                return "Long: " + value;
            }
        });

        result.print();

        env.execute();
    }
}

①ConnectedStreams有兩個類型參數(shù),分別是stream1和stream2的類型;

②map方法中實現(xiàn)了一個CoMapFunction,表示分別對兩條流中的數(shù)據(jù)執(zhí)行 map 操作

類型參數(shù)<IN1, IN2, OUT>,分別表示第一條流、第二條流,以及合并后的流中的數(shù)據(jù)類型

這里我們將一條 Integer 流和一條 Long 流合并,轉(zhuǎn)換成 String 輸出。所以當(dāng)遇到第一條流輸入的整型值時,調(diào)用.map1();而遇到第二條流輸入的長整型數(shù)據(jù)時,調(diào)用.map2():最終都轉(zhuǎn)換為字符串輸出,合并成了一條字符串流

③補充:ConnectedStreams 也可以直接調(diào)用.keyBy()進行按鍵分區(qū)的操作,得到的還是一個 ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);

傳入的參數(shù)是兩條流中各自的鍵選擇器

這樣的操作就是把兩條流中key相同的數(shù)據(jù)放到了一起,然后針對來源的流各自進行處理;

同樣也可以在合并之前先使用KeyBy進行分區(qū),然后基于兩條KeyedStream進行連接操作;

要注意兩條流定義的鍵的類型必須相同,否則會拋出異常


CoProcessFunction

對于連接流 ConnectedStreams 的處理操作,需要分別定義對兩條流的處理轉(zhuǎn)換,因此接口中就會有兩個相同的方法需要實現(xiàn),用數(shù)字“1”“2”區(qū)分,在兩條流中的數(shù)據(jù)到來時分別調(diào)用。我們把這種接口叫作“協(xié)同處理函數(shù)”(co-process function)

例如:CoMapFunction、CoFlatMapFunction、CoProcessFunction

CoProcessFunction源碼如下:

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}

public abstract class Context {...}
...
}

簡單示例:實現(xiàn)一個實時對賬的需求,也就是app 的支付操作和第三方的支付操作的一個雙流 Join。App 的支付事件和第三方的支付事件將會互相等待 5 秒鐘,如果等不來對應(yīng)的支付事件,那么就輸出報警信息

// 實時對賬
public class BillCheckExample {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 來自app的支付日志
        SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );

        // 來自第三方支付平臺的支付日志
        SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {
                        return element.f3;
                    }
                })
        );

        // 檢測同一支付單在兩條流中是否匹配,不匹配就報警
        appStream.connect(thirdpartStream)
                .keyBy(data -> data.f0, data -> data.f0)
                .process(new OrderMatchResult())
                .print();

        env.execute();
    }

    // 自定義實現(xiàn)CoProcessFunction
    public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{
        // 定義狀態(tài)變量,用來保存已經(jīng)到達的事件
        private ValueState<Tuple3<String, String, Long>> appEventState;
        private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

        @Override
        public void open(Configuration parameters) throws Exception {
            appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
            );

            thirdPartyEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
            );
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
            // 看另一條流中事件是否來過
            if (thirdPartyEventState.value() != null){
                out.collect("對賬成功:" + value + "  " + thirdPartyEventState.value());
                // 清空狀態(tài)
                thirdPartyEventState.clear();
            } else {
                // 更新狀態(tài)
                appEventState.update(value);
                // 注冊一個5秒后的定時器,開始等待另一條流的事件
                ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
            }
        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
            if (appEventState.value() != null){
                out.collect("對賬成功:" + appEventState.value() + "  " + value);
                // 清空狀態(tài)
                appEventState.clear();
            } else {
                // 更新狀態(tài)
                thirdPartyEventState.update(value);
                // 注冊一個5秒后的定時器,開始等待另一條流的事件
                ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 定時器觸發(fā),判斷狀態(tài),如果某個狀態(tài)不為空,說明另一條流中事件沒來
            if (appEventState.value() != null) {
                out.collect("對賬失敗:" + appEventState.value() + "  " + "第三方支付平臺信息未到");
            }
            if (thirdPartyEventState.value() != null) {
                out.collect("對賬失?。? + thirdPartyEventState.value() + "  " + "app信息未到");
            }
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }}

運行結(jié)果如下:

Flink多流轉(zhuǎn)換(1)—— 分流&合流,Flink,大數(shù)據(jù),flink,java,python,大數(shù)據(jù)

運行結(jié)果解析:
①在CoProcessFunction的實現(xiàn)中,聲明了兩個狀態(tài)變量用來保存App的支付信息和第三方的支付信息

②App支付信息到達之后,觸發(fā)processElement1中的操作,檢查第三方的支付信息是否已經(jīng)到達(如果先到達會保存在相應(yīng)的狀態(tài)變量中);如果已經(jīng)到達,則對賬成功;如果沒有到達,則等待5s,仍未到達則對賬失敗;

③第三方支付信息到達后,流程同②

④對于order-1,時間戳為1000的數(shù)據(jù)(App)到達后,第三方支付信息未到達,等待5s,接著時間戳未3000的數(shù)據(jù)(第三方)到達后,發(fā)現(xiàn)App支付信息已經(jīng)到達,因此對賬成功

⑤對于order-2和order-3,均是等待5s后沒有檢測到App(第三方)數(shù)據(jù)到達而發(fā)出報警信息

廣播連接流(BroadcastConnectedStream)

DataStream 調(diào)用.connect()方法時可以傳入一個廣播流(BroadcastConnectedStream)

這種連接方式往往用在需要動態(tài)定義某些規(guī)則或配置的場景。因為規(guī)則是實時變動的,所以我們可以用一個單獨的流來獲取規(guī)則數(shù)據(jù);而這些規(guī)則或配置是對整個應(yīng)用全局有效的,所以不能只把這數(shù)據(jù)傳遞給一個下游并行子任務(wù)處理,而是要“廣播”(broadcast)給所有的并行子任務(wù)。而下游子任務(wù)收到廣播出來的規(guī)則,會把它保存成一個狀態(tài),這就是所謂的“廣播狀態(tài)”(broadcast state)

如何創(chuàng)建廣播流


基于DataStream調(diào)用.broadcast()方法,傳入一個“映射狀態(tài)描述器”(MapStateDescriptor),說明狀態(tài)的名稱和類型;

因為廣播狀態(tài)底層是用一個“映射”(map)結(jié)構(gòu)來保存的

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
數(shù)據(jù)流和廣播流的連接

得到“廣播連接流”(BroadcastConnectedStream),然后基于廣播連接流調(diào)用.process()方法,就可以同時獲取規(guī)則和數(shù)據(jù),進行動態(tài)處理

DataStream<String> output = stream
 .connect(ruleBroadcastStream)
 .process( new BroadcastProcessFunction<>() {...} );
BroadcastProcessFunction

BroadcastProcessFunction 與 CoProcessFunction 類似,同樣是一個抽象類,需要實現(xiàn)兩個方法,針對合并的兩條流中元素分別定義處理操作。區(qū)別在于這里一條流是正常處理數(shù)據(jù),而另一條流則是要用新規(guī)則來更新廣播狀態(tài),所以對應(yīng)的兩個方法叫作.processElement().processBroadcastElement()

學(xué)習(xí)課程鏈接:【尚硅谷】Flink1.13實戰(zhàn)教程(涵蓋所有flink-Java知識點)_嗶哩嗶哩_bilibili?文章來源地址http://www.zghlxwxcb.cn/news/detail-821336.html

到了這里,關(guān)于Flink多流轉(zhuǎn)換(1)—— 分流&合流的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 《Flink學(xué)習(xí)筆記》——第九章 多流轉(zhuǎn)換

    《Flink學(xué)習(xí)筆記》——第九章 多流轉(zhuǎn)換

    無論是基本的簡單轉(zhuǎn)換和聚合,還是基于窗口的計算,我們都是針對一條流上的數(shù)據(jù)進行處理的。而在實際應(yīng)用中,可能需要將不同來源的數(shù)據(jù)連接合并在一起處理,也有可能需要將一條流拆分開,所以經(jīng)常會有對多條流進行處理的場景 簡單劃分(兩大類): 分流——把一

    2024年02月11日
    瀏覽(33)
  • Flink學(xué)習(xí)——處理函數(shù)ProcessFunction及多流轉(zhuǎn)換

    Flink學(xué)習(xí)——處理函數(shù)ProcessFunction及多流轉(zhuǎn)換

    ? ? ? ? 在DataStream的更底層,我們可以不定義任何具體的算子(如map(),filter()等)二只提煉出一個統(tǒng)一的“處理”(process)操作?。它是所有轉(zhuǎn)換算子的概括性的表達。可以自定義處理邏輯。 ? ? ? ? 所以這一層接口就被叫做“ 處理函數(shù) ”( process function ) ? ? ? ? 處理

    2024年02月14日
    瀏覽(17)
  • Flink-多流轉(zhuǎn)換(Union、Connect、Join)

    Flink-多流轉(zhuǎn)換(Union、Connect、Join)

    無論是基本的簡單轉(zhuǎn)換和聚合,還是基于窗口的計算,我們都是針對一條流上的數(shù)據(jù)進行處理的。而在實際應(yīng)用中,可能需要將不同來源的數(shù)據(jù)連接合并在一起處理,也有可能需要將一條流拆分開,所以經(jīng)常會有對多條流進行處理的場景。 簡單劃分的話,多流轉(zhuǎn)換可以分為“

    2024年02月14日
    瀏覽(28)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)轉(zhuǎn)換算子(Transformation)【分流】

    所謂 “分流” ,就是將一條數(shù)據(jù)流拆分成完全獨立的兩條、甚至多條流。也就是基于一個 DataStream ,定義一些篩選條件,將符合條件的數(shù)據(jù)揀選出來放到對應(yīng)的流里。 其實根據(jù)條件篩選數(shù)據(jù)的需求,本身非常容易實現(xiàn):只要針對同一條流多次獨立調(diào)用 .filter() 方法進行篩選

    2024年01月24日
    瀏覽(17)
  • Flink之SideOutput(數(shù)據(jù)分流)

    Flink在早期版本有一個 split 算子用來做 數(shù)據(jù)分流 使用的,但是在 flink-1.12 開始這個 API 就已經(jīng)被刪除了,在 1.12 版本以后我們是通過 process 算子來做數(shù)據(jù)分流的,這里就介紹一下如何使用 prodess 進行數(shù)據(jù)分流. 代碼 結(jié)果數(shù)據(jù) 通過結(jié)果內(nèi)容可以看到數(shù)據(jù)完全按照我們分流的邏輯進

    2024年02月14日
    瀏覽(21)
  • Spring SpEL在Flink中的應(yīng)用-與Filter結(jié)合實現(xiàn)數(shù)據(jù)動態(tài)分流

    SpEL表達式與Flink fiter結(jié)合可以實現(xiàn)基于表達式的靈活動態(tài)過濾。有關(guān)SpEL表達式的使用請參考 Spring SpEL在Flink中的應(yīng)用-SpEL詳解 。 可以將過濾規(guī)則放入數(shù)據(jù)庫,根據(jù)不同的數(shù)據(jù)設(shè)置不同的過濾表達式,從而實現(xiàn)只需修改過濾表達式不用修改Flink代碼的功能。對于基于Flink進行數(shù)

    2024年01月25日
    瀏覽(12)
  • flink學(xué)習(xí)之廣播流與合流操作demo

    flink學(xué)習(xí)之廣播流與合流操作demo

    廣播流是什么? 將一條數(shù)據(jù)廣播到所有的節(jié)點。使用 dataStream.broadCast() 廣播流使用場景? 一般用于動態(tài)加載配置項。比如lol,每天不斷有人再投訴舉報,客服根本忙不過來,騰訊內(nèi)部做了一個判斷,只有vip3以上的客戶的投訴才會有人工一對一回復(fù),過了一段時間大家都發(fā)現(xiàn)

    2024年02月09日
    瀏覽(16)
  • 【Flink實戰(zhàn)】Flink中的分流

    Flink中的分流 在Flink中將數(shù)據(jù)流切分為多個子數(shù)據(jù)流,子數(shù)據(jù)流稱為”旁路輸出數(shù)據(jù)流“。 拆分流數(shù)據(jù)的方式 Split,已經(jīng)廢棄,不推薦使用 Fliter SideOut,推薦使用 Fliter分流的Java實現(xiàn) SideOut分流的Java實現(xiàn) SideOutPut 是 Flink 框架推薦的分流方法,在使用 SideOutPut 時,需要按照以下

    2024年02月10日
    瀏覽(21)
  • Flink多流處理之join(關(guān)聯(lián))

    Flink的 API 中只提供了 join 的算子,并沒有 left join 或者 right join ,這里我們就介紹一下 join 算子的使用,其實 join 算子底層調(diào)用的就是 coGroup ,具體原理這里就不過多介紹了,如果感興趣可以看我前面發(fā)布的文章Flink多流操作之coGroup. 數(shù)據(jù)源 代碼 結(jié)果 這個 API 使用起來還是比較簡單

    2024年02月13日
    瀏覽(25)
  • Flink+Paimon多流拼接性能優(yōu)化實戰(zhàn)

    Flink+Paimon多流拼接性能優(yōu)化實戰(zhàn)

    目錄 (零)本文簡介 意外收獲: (一)背景 (二)探索梳理過程 (三)源碼改造 (四)修改效果 1、JOB狀態(tài) 2、Level5的dataFile總大小 3、數(shù)據(jù)延遲 4、關(guān)聯(lián)率 (五)未來展望:異步Compact Paimon多流拼接/合并性能優(yōu)化; ? ? ? ? 為解決 離線T+1多流拼接數(shù)據(jù)時效性 、 Flink實時

    2024年02月09日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包