国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié)

這篇具有很好參考價(jià)值的文章主要介紹了Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

雙流連結(jié)(Join):根據(jù)某個(gè)字段的值將數(shù)據(jù)聯(lián)結(jié)起來(lái),“配對(duì)”去做處理

窗口聯(lián)結(jié)(Window Join)

可以定義時(shí)間窗口,并將兩條流中共享一個(gè)公共鍵(key)的數(shù)據(jù)放在窗口中進(jìn)行配對(duì)處理

代碼邏輯

首先需要調(diào)用 DataStream 的.join()方法來(lái)合并兩條流,得到一個(gè) JoinedStreams;接著通過(guò).where().equalTo()方法指定兩條流中聯(lián)結(jié)的 key;然后通過(guò).window()開(kāi)窗口,并調(diào)用.apply()傳入聯(lián)結(jié)窗口函數(shù)進(jìn)行處理計(jì)算

stream1.join(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(<WindowAssigner>)
 .apply(<JoinFunction>)

對(duì)于JoinFunction

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
 OUT join(IN1 first, IN2 second) throws Exception;
}

使用時(shí)需要實(shí)現(xiàn)內(nèi)部的join方法,有兩個(gè)參數(shù),分別表示兩條流中成對(duì)匹配的數(shù)據(jù)

JoinFunciton 并不是真正的“窗口函數(shù)”,它只是定義了窗口函數(shù)在調(diào)用時(shí)對(duì)匹配數(shù)據(jù)的具體處理邏輯

實(shí)現(xiàn)流程

Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié),大數(shù)據(jù),Flink,flink,java,服務(wù)器,大數(shù)據(jù)

兩條流的數(shù)據(jù)到來(lái)之后,首先會(huì)按照 key 分組、進(jìn)入對(duì)應(yīng)的窗口中存儲(chǔ);當(dāng)?shù)竭_(dá)窗口結(jié)束時(shí)間時(shí),算子會(huì)先統(tǒng)計(jì)出窗口內(nèi)兩條流的數(shù)據(jù)的所有組合,也就是對(duì)兩條流中的數(shù)據(jù)做一個(gè)笛卡爾積(相當(dāng)于表的交叉連接,cross join),然后進(jìn)行遍歷,把每一對(duì)匹配的數(shù)據(jù),作為參數(shù)(first,second)傳入 JoinFunction 的.join()方法進(jìn)行計(jì)算處理。所以窗口中每有一對(duì)數(shù)據(jù)成功聯(lián)結(jié)匹配,JoinFunction 的.join()方法就會(huì)被調(diào)用一次,并輸出一個(gè)結(jié)果

實(shí)例分析

// 基于窗口的join
public class WindowJoinTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env
                .fromElements(
                        Tuple2.of("a", 1000L),
                        Tuple2.of("b", 1000L),
                        Tuple2.of("a", 2000L),
                        Tuple2.of("b", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        DataStream<Tuple2<String, Long>> stream2 = env
                .fromElements(
                        Tuple2.of("a", 3000L),
                        Tuple2.of("b", 3000L),
                        Tuple2.of("a", 4000L),
                        Tuple2.of("b", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        stream1
                .join(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
                        return left + "=>" + right;
                    }
                })
                .print();

        env.execute();
    }
}

運(yùn)行結(jié)果如下:

Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié),大數(shù)據(jù),Flink,flink,java,服務(wù)器,大數(shù)據(jù)

間隔聯(lián)結(jié)(Interval Join)

統(tǒng)計(jì)一段時(shí)間內(nèi)的數(shù)據(jù)匹配情況,不應(yīng)該用滾動(dòng)窗口或滑動(dòng)窗口來(lái)處理——因?yàn)槠ヅ涞膬蓚€(gè)數(shù)據(jù)有可能剛好“卡在”窗口邊緣兩側(cè)

因此需要采用”間隔聯(lián)結(jié)“的操作,針對(duì)一條流的每個(gè)數(shù)據(jù),開(kāi)辟出其時(shí)間戳前后的一段時(shí)間間隔,看這期間是否有來(lái)自另一條流的數(shù)據(jù)匹配

原理

給定兩個(gè)時(shí)間點(diǎn),分別叫作間隔的“上界”(upperBound)和“下界”(lowerBound);

于是對(duì)于一條流(不妨叫作 A)中的任意一個(gè)數(shù)據(jù)元素 a,就可以開(kāi)辟一段時(shí)間間隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的時(shí)間戳為中心,下至下界點(diǎn)、上至上界點(diǎn)的一個(gè)閉區(qū)間:我們就把這段時(shí)間作為可以匹配另一條流數(shù)據(jù)的“窗口”范圍

所以對(duì)于另一條流(不妨叫 B)中的數(shù)據(jù)元素 b,如果它的時(shí)間戳落在了這個(gè)區(qū)間范圍內(nèi),a 和 b 就可以成功配對(duì),進(jìn)而進(jìn)行計(jì)算輸出結(jié)果

匹配條件為:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

舉例分析如下:

Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié),大數(shù)據(jù),Flink,flink,java,服務(wù)器,大數(shù)據(jù)

下方的流 A 去間隔聯(lián)結(jié)上方的流 B,所以基于 A 的每個(gè)數(shù)據(jù)元素,都可以開(kāi)辟一個(gè)間隔區(qū)間。我們這里設(shè)置下界為-2 毫秒,上界為 1 毫秒。于是對(duì)于時(shí)間戳為 2 的 A 中元素,它的可匹配區(qū)間就是[0, 3],流 B 中有時(shí)間戳為 0、1 的兩個(gè)元素落在這個(gè)范圍內(nèi),所以就可以得到匹配數(shù)據(jù)對(duì)(2, 0)和(2, 1)。同樣地,A 中時(shí)間戳為 3 的元素,可匹配區(qū)間為[1, 4],B 中只有時(shí)間戳為 1 的一個(gè)數(shù)據(jù)可以匹配,于是得到匹配數(shù)據(jù)對(duì)(3, 1)

代碼邏輯

基于KeyedStream進(jìn)行聯(lián)結(jié)操作:


①DataStream 通過(guò) keyBy 得到KeyedStream

②調(diào)用.intervalJoin()來(lái)合并兩條流,得到的是一個(gè) IntervalJoin 類型

③通過(guò).between()方法指定間隔的上下界,再調(diào)用.process()方法,定義對(duì)匹配數(shù)據(jù)對(duì)的處理操作

④調(diào)用.process()需要傳入一個(gè)處理函數(shù):ProcessJoinFunction

stream1
 .keyBy(<KeySelector>)
 .intervalJoin(stream2.keyBy(<KeySelector>))
 .between(Time.milliseconds(-2), Time.milliseconds(1))
 .process (new ProcessJoinFunction<Integer, Integer, String(){
 	@Override
 	public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
 	out.collect(left + "," + right);
 }
 });

實(shí)例分析

// 基于間隔的join
public class IntervalJoinTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
                Tuple3.of("Mary", "order-1", 5000L),
                Tuple3.of("Alice", "order-2", 5000L),
                Tuple3.of("Bob", "order-3", 20000L),
                Tuple3.of("Alice", "order-4", 20000L),
                Tuple3.of("Cary", "order-5", 51000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );

        SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 36000L),
                new Event("Bob", "./home", 30000L),
                new Event("Bob", "./prod?id=1", 23000L),
                new Event("Bob", "./prod?id=3", 33000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );

        orderStream.keyBy(data -> data.f0)
                .intervalJoin(clickStream.keyBy(data -> data.user))
                .between(Time.seconds(-5), Time.seconds(10))
                .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
                    @Override
                    public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(right + " => " + left);
                    }
                })
                .print();

        env.execute();
    }}

運(yùn)行結(jié)果如下:

Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié),大數(shù)據(jù),Flink,flink,java,服務(wù)器,大數(shù)據(jù)

窗口同組聯(lián)結(jié)(Window CoGroup)

用法跟 window join非常類似,也是將兩條流合并之后開(kāi)窗處理匹配的元素,調(diào)用時(shí)只需要將.join()換為.coGroup()就可以了

代碼邏輯

stream1.coGroup(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .apply(<CoGroupFunction>)

與 window join 的區(qū)別在于,調(diào)用.apply()方法定義具體操作時(shí),傳入的是一個(gè)CoGroupFunction

public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
 void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}

coGroup方法與FlatJoinFunction.join()方法類似,其中的三個(gè)參數(shù)依舊是兩條流中的數(shù)據(jù)以及用于輸出的收集器;但前兩個(gè)參數(shù)不再是單獨(dú)的配對(duì)數(shù)據(jù),而是可遍歷的數(shù)據(jù)集合;

因此該方法中直接把收集到的所有數(shù)據(jù)一次性傳入,然后自定義配對(duì)方式,不需要再計(jì)算窗口中兩條流數(shù)據(jù)集的笛卡爾積;

實(shí)例分析

// 基于窗口的join
public class CoGroupTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env
                .fromElements(
                        Tuple2.of("a", 1000L),
                        Tuple2.of("b", 1000L),
                        Tuple2.of("a", 2000L),
                        Tuple2.of("b", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        DataStream<Tuple2<String, Long>> stream2 = env
                .fromElements(
                        Tuple2.of("a", 3000L),
                        Tuple2.of("b", 3000L),
                        Tuple2.of("a", 4000L),
                        Tuple2.of("b", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        stream1
                .coGroup(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
                        collector.collect(iter1 + "=>" + iter2);
                    }
                })
                .print();

        env.execute();
    }
}

運(yùn)行結(jié)果如下:

Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié),大數(shù)據(jù),Flink,flink,java,服務(wù)器,大數(shù)據(jù)

學(xué)習(xí)課程鏈接:【尚硅谷】Flink1.13實(shí)戰(zhàn)教程(涵蓋所有flink-Java知識(shí)點(diǎn))_嗶哩嗶哩_bilibili??文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-825459.html

到了這里,關(guān)于Flink多流轉(zhuǎn)換(2)—— 雙流連結(jié)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink-多流轉(zhuǎn)換(Union、Connect、Join)

    Flink-多流轉(zhuǎn)換(Union、Connect、Join)

    無(wú)論是基本的簡(jiǎn)單轉(zhuǎn)換和聚合,還是基于窗口的計(jì)算,我們都是針對(duì)一條流上的數(shù)據(jù)進(jìn)行處理的。而在實(shí)際應(yīng)用中,可能需要將不同來(lái)源的數(shù)據(jù)連接合并在一起處理,也有可能需要將一條流拆分開(kāi),所以經(jīng)常會(huì)有對(duì)多條流進(jìn)行處理的場(chǎng)景。 簡(jiǎn)單劃分的話,多流轉(zhuǎn)換可以分為“

    2024年02月14日
    瀏覽(28)
  • 說(shuō)說(shuō)Flink雙流join

    說(shuō)說(shuō)Flink雙流join

    Flink雙流JOIN主要分為兩大類 一類是基于原生State的Connect算子操作 另一類是基于窗口的JOIN操作。其中基于窗口的JOIN可細(xì)分為window join和interval join兩種。 基于原生State的Connect算子操作 實(shí)現(xiàn)原理:底層原理依賴Flink的State狀態(tài)存儲(chǔ),通過(guò)將數(shù)據(jù)存儲(chǔ)到State中進(jìn)行關(guān)聯(lián)join, 最終輸出

    2024年02月10日
    瀏覽(25)
  • Flink雙流(join)

    Flink雙流(join)

    Join大體分類只有兩種: Window Join和Interval Join Window Join有可以根據(jù)Window的類型細(xì)分出3種: Tumbling(滾動(dòng)) Window Join、Sliding(滑動(dòng)) Window Join、Session(會(huì)話) Widnow Join。 ??????????Window 類型的join都是利用window的機(jī)制,先將數(shù)據(jù)緩存在Window State中,當(dāng)窗口觸發(fā)計(jì)算時(shí),執(zhí)行join操作

    2024年02月22日
    瀏覽(17)
  • flink重溫筆記(十三): flink 高級(jí)特性和新特性(2)——ProcessFunction API 和 雙流 join

    flink重溫筆記(十三): flink 高級(jí)特性和新特性(2)——ProcessFunction API 和 雙流 join

    前言:今天是學(xué)習(xí) flink 的第 13 天啦!學(xué)習(xí)了 flink 高級(jí)特性和新特性之ProcessFunction API 和 雙流 join,主要是解決大數(shù)據(jù)領(lǐng)域數(shù)據(jù)從數(shù)據(jù)增量聚合的問(wèn)題,以及快速變化中的流數(shù)據(jù)拉寬問(wèn)題,即變化中多個(gè)數(shù)據(jù)源合并在一起的問(wèn)題,結(jié)合自己實(shí)驗(yàn)猜想和代碼實(shí)踐,總結(jié)了很多自

    2024年03月12日
    瀏覽(24)
  • flink雙流ioin的大狀態(tài)如何解決和調(diào)優(yōu)

    Flink 中的雙流 ioin 操作(雙流連接)通常涉及大狀態(tài)的處理,這可能導(dǎo)致一些性能和狀態(tài)管理的挑戰(zhàn)。以下是解決和調(diào)優(yōu) Flink 中雙流 ioin 大狀態(tài)的一些建議: 解決方案: 增大任務(wù)管理器的堆內(nèi)存: 對(duì)于處理大狀態(tài)的任務(wù),增加 Flink 任務(wù)管理器的堆內(nèi)存可以提供更多的內(nèi)存

    2024年01月22日
    瀏覽(33)
  • Flink多流處理之join(關(guān)聯(lián))

    Flink的 API 中只提供了 join 的算子,并沒(méi)有 left join 或者 right join ,這里我們就介紹一下 join 算子的使用,其實(shí) join 算子底層調(diào)用的就是 coGroup ,具體原理這里就不過(guò)多介紹了,如果感興趣可以看我前面發(fā)布的文章Flink多流操作之coGroup. 數(shù)據(jù)源 代碼 結(jié)果 這個(gè) API 使用起來(lái)還是比較簡(jiǎn)單

    2024年02月13日
    瀏覽(25)
  • Flink多流處理之Broadcast(廣播變量)

    Flink多流處理之Broadcast(廣播變量)

    寫過(guò)Spark批處理的應(yīng)該都知道,有一個(gè)廣播變量 broadcast 這樣的一個(gè)算子,可以優(yōu)化我們計(jì)算的過(guò)程,有效的提高效率;同樣在Flink中也有 broadcast ,簡(jiǎn)單來(lái)說(shuō)和Spark中的類似,但是有所區(qū)別,首先Spark中的 broadcast 是靜態(tài)的數(shù)據(jù),而Flink中的 broadcast 是動(dòng)態(tài)的,也就是源源不斷的數(shù)據(jù)流.在Fl

    2024年02月13日
    瀏覽(25)
  • Flink+Paimon多流拼接性能優(yōu)化實(shí)戰(zhàn)

    Flink+Paimon多流拼接性能優(yōu)化實(shí)戰(zhàn)

    目錄 (零)本文簡(jiǎn)介 意外收獲: (一)背景 (二)探索梳理過(guò)程 (三)源碼改造 (四)修改效果 1、JOB狀態(tài) 2、Level5的dataFile總大小 3、數(shù)據(jù)延遲 4、關(guān)聯(lián)率 (五)未來(lái)展望:異步Compact Paimon多流拼接/合并性能優(yōu)化; ? ? ? ? 為解決 離線T+1多流拼接數(shù)據(jù)時(shí)效性 、 Flink實(shí)時(shí)

    2024年02月09日
    瀏覽(29)
  • Flink多流處理之connect拼接流

    Flink多流處理之connect拼接流

    Flink中的拼接流 connect 的使用其實(shí)非常簡(jiǎn)單,就是 leftStream.connect(rightStream) 的方式,但是有一點(diǎn)我們需要清楚,使用 connect 后并不是將兩個(gè)流給串聯(lián)起來(lái)了,而是將左流和右流建立一個(gè)聯(lián)系,作為一個(gè)大的流,并且這個(gè)大的流可以使用相同的邏輯處理 leftStream 和 rightStream ,也可以使用不

    2024年02月13日
    瀏覽(35)
  • flink多流操作(connect cogroup union broadcast)

    flink多流操作(connect cogroup union broadcast)

    connect 翻譯成中文意為連接,可以將兩個(gè)數(shù)據(jù)類型一樣也可以類型不一樣 DataStream 連接成一個(gè)新 的 ConnectedStreams。需要注意的是,connect 方法與 union 方法不同,雖然調(diào)用 connect 方法將兩個(gè) 流連接成一個(gè)新的 ConnectedStreams,但是里面的兩個(gè)流依然是相互獨(dú)立的,這個(gè)方法最大的

    2024年02月21日
    瀏覽(21)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包