国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-維表來源于第三方數據源

這篇具有很好參考價值的文章主要介紹了【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-維表來源于第三方數據源。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關基礎內容。

  • 2、Flink基礎系列
    本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。

  • 3、Flik Table API和SQL基礎系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數、catalog等等內容。

  • 4、Flik Table API和SQL提高與應用系列
    本部分是table api 和sql的應用部分,和實際的生產應用聯系更為密切,以及有一定開發(fā)難度的內容。

  • 5、Flink 監(jiān)控系列
    本部分和實際的運維、監(jiān)控工作相關。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內容。

兩專欄的所有文章入口點擊:Flink 系列文章匯總索引



本文介紹了Flink 通過異步IO的方式訪問第三方數據源,介紹了2個示例,即通過緩存和redis的方式。

如果需要了解更多內容,可以在本人Flink 專欄中了解更新系統(tǒng)的內容。

本文除了maven依賴外,本文還依賴redis的環(huán)境。

本專題分為以下幾篇文章:
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-初始化的靜態(tài)數據
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-維表來源于第三方數據源
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-通過廣播將維表數據傳遞到下游
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-通過Temporal table實現維表數據join
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-完整版(1)
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-完整版(2)文章來源地址http://www.zghlxwxcb.cn/news/detail-810719.html

一、maven依賴及數據結構

1、maven依賴

本文的所有示例均依賴本部分的pom.xml內容,可能針對下文中的某些示例存在過多的引入,根據自己的情況進行刪減。

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-common</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-bridge</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-planner_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-uber</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-runtime</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-jdbc</artifactId>
		<version>3.1.0-1.17</version>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.38</version>
	</dependency>
	<dependency>
		<groupId>com.google.guava</groupId>
		<artifactId>guava</artifactId>
		<version>32.0.1-jre</version>
	</dependency>
	<!-- flink連接器 -->
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-kafka</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-sql-connector-kafka</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
	<dependency>
		<groupId>org.apache.commons</groupId>
		<artifactId>commons-compress</artifactId>
		<version>1.24.0</version>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.2</version>
	</dependency>
	<dependency>
		<groupId>org.apache.bahir</groupId>
		<artifactId>flink-connector-redis_2.12</artifactId>
		<version>1.1.0</version>
		<exclusions>
			<exclusion>
				<artifactId>flink-streaming-java_2.12</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-runtime_2.12</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-core</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-java</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-api-java</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-api-java-bridge_2.12</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-common</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-planner_2.12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>2.0.43</version>
	</dependency>
</dependencies>

2、數據結構

本示例僅僅為實現需求:將訂單中uId與用戶id進行關聯,然后輸出Tuple2<Order, String>。

  • 事實流 order
    // 事實表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }
  • 維度流 user
    // 維表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

3、數據源

事實流數據有幾種,具體見示例部分,比如socket、redis、kafka等
維度表流有幾種,具體見示例部分,比如靜態(tài)數據、mysql、socket、kafka等。
如此,實現本文中的示例就需要準備好相應的環(huán)境,即mysql、redis、kafka、netcat等。

4、驗證結果

本文提供的所有示例均為驗證通過的示例,測試的數據均在每個示例中,分為事實流、維度流和運行結果進行注釋,在具體的示例中關于驗證不再贅述。

二、維表來源于第三方數據源

1、說明

這種方式是將維表數據存儲在Redis、HBase、MySQL等外部存儲中,事實流在關聯維表數據的時候實時去外部存儲中查詢。

由于維度數據量不受內存限制,可以存儲很大的數據量。同時維表數據來源于第三方數據源,讀取速度受制于外部存儲的讀取速度。一般常見的做法該種方式較多。

2、示例:將事實流與維表進行關聯-通過緩存降低性能開銷

如果頻繁的訪問第三方數據源進行join,會帶來很大的開銷,為降低該種情況的開銷,一般使用cache來減輕訪問壓力,但該種方式存在數據同步的不一致或延遲情況。如果使用緩存,則會存在將數據存在內存中,也會增加系統(tǒng)開銷。該種情況的實際應用以具體的業(yè)務場景而定。本示例使用的是guava Cache,緩存的實現有很多種方式,具體以自己的實際情況進行選擇。

本示例的數據源僅僅以靜態(tài)的數據進行展示,實際上可能數據來源于Hbase、mysql等。

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestJoinDimFromCacheDataDemo {
    // 維表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

    // 事實表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // order 實時流
        DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                });

        // user 維表
        DataStream<Tuple2<Order, String>> result = orderDs.map(new RichMapFunction<Order, Tuple2<Order, String>>() {
            // 緩存接口這里是LoadingCache,LoadingCache在緩存項不存在時可以自動加載緩存
            LoadingCache<Integer, User> userDim;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 使用google LoadingCache來進行緩存
                // CacheBuilder的構造函數是私有的,只能通過其靜態(tài)方法newBuilder()來獲得CacheBuilder的實例
                userDim = CacheBuilder.newBuilder()
                        // 設置并發(fā)級別為8,并發(fā)級別是指可以同時寫緩存的線程數
                        .concurrencyLevel(8)
                        // 最多緩存?zhèn)€數,超過了就根據最近最少使用算法來移除緩存
                        .maximumSize(1000)
                        // 設置寫緩存后10分鐘過期
                        .expireAfterWrite(10, TimeUnit.MINUTES)
                        // 設置緩存容器的初始容量為10
                        .initialCapacity(10)
                        // 設置要統(tǒng)計緩存的命中率
                        .recordStats()
                        // 指定移除通知
                        .removalListener(new RemovalListener<Integer, User>() {
                            @Override
                            public void onRemoval(RemovalNotification<Integer, User> removalNotification) {
                                System.out.println(removalNotification.getKey() + "被移除了,值為:" + removalNotification.getValue());
                            }
                        })
                        .build(
                                // 指定加載緩存的邏輯
                                new CacheLoader<Integer, User>() {
                                    @Override
                                    public User load(Integer uId) throws Exception {
                                        return dataSource(uId);
                                    }
                                });
                System.out.println("userDim:" + userDim.get(1002));
            }

            private User dataSource(Integer uId) {
                // 可以是任何數據源,本處僅僅示例
                Map<Integer, User> users = new HashMap<>();
                users.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
                users.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
                users.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
                users.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
                users.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
                User user = null;
                if (users.containsKey(uId)) {
                    user = users.get(uId);
                }

                return user;
            }

            @Override
            public Tuple2<Order, String> map(Order value) throws Exception {
                return new Tuple2(value, userDim.get(value.getUId()).getName());
            }

        });

        result.print();
        // 輸入數據
        // 7,1003,111
        // 8,1005,234
        // 9,1002,875

        // 控制臺輸出數據
        // 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn)
        // 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005,  total=234.0),alan_chan_chn)
        // 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan)

        env.execute("TestJoinDimFromCacheDataDemo");
    }
}

3、示例:將事實流與維表進行關聯-通過Flink 的異步 I/O提高系統(tǒng)效率

Flink與外部存儲系統(tǒng)進行讀寫操作的時候可以使用同步方式,也就是發(fā)送一個請求后等待外部系統(tǒng)響應,然后再發(fā)送第二個讀寫請求,這樣的方式吞吐量比較低,可以用提高并行度的方式來提高吞吐量,但是并行度多了也就導致了進程數量多了,占用了大量的資源。

Flink中可以使用異步IO來讀寫外部系統(tǒng),這要求外部系統(tǒng)客戶端支持異步IO,比如redis、MongoDB等。

更多內容見文章:
55、Flink之用于外部數據訪問的異步 I/O介紹及示例

1)、redis 異步I/O實現

package org.tablesql.join;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class JoinAyncFunctionByRedis extends RichAsyncFunction<Order, Tuple2<Order, String>> {
    private JedisPoolConfig config = null;

    private static String ADDR = "192.168.10.41";
    private static int PORT = 6379;
    private static int TIMEOUT = 10000;
    private JedisPool jedisPool = null;
    private Jedis jedis = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        config = new JedisPoolConfig();
        jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);

        jedis = jedisPool.getResource();
    }

    @Override
    public void asyncInvoke(Order input, ResultFuture<Tuple2<Order, String>> resultFuture) throws Exception {
        // order 實時流中的單行數據
        System.out.println("輸入參數input----:" + input);
        // 發(fā)起一個異步請求,返回結果
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                // 數據格式:1002,alanchan,19,25,alan.chan.chn@163.com
                String userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId() + "");
                String[] userTemp = userLine.split(",");
                // 返回 用戶名
                return userTemp[1];
            }
        }).thenAccept((String dbResult) -> {
            // 設置請求完成時的回調,將結果返回
            List list = new ArrayList<Tuple2<Order, String>>();
            list.add(new Tuple2<>(input, dbResult));
            resultFuture.complete(list);
        });
    }

    // 連接超時的時候調用的方法
    public void timeout(Order input, ResultFuture<Tuple2<Order, String>> resultFuture)
            throws Exception {
        List list = new ArrayList<Tuple2<Order, String>>();
        // 數據源超時,不能獲取到維表信息,置為"
        list.add(new Tuple2<>(input, ""));
        resultFuture.complete(list);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis.isConnected()) {
            jedis.close();
        }

    }
}

2)、實現事實流與維度流join


package org.tablesql.join;

import java.util.concurrent.TimeUnit;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestJoinDimFromAsyncDataStreamDemo {
    // 維表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

    // 事實表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }

    public static void main(String[] args) throws Exception {
        testJoinAyncFunctionByRedis();
    }

    static void testJoinAyncFunctionByRedis() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // order 實時流
        DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                });

        // 保證順序:異步返回的結果保證順序,超時時間1秒,最大容量2,超出容量觸發(fā)反壓
        DataStream<Tuple2<Order, String>> result = AsyncDataStream.orderedWait(orderDs, new JoinAyncFunctionByRedis(),
                1000L, TimeUnit.MILLISECONDS, 2);

        result.print("result:");

        // 允許亂序:異步返回的結果允許亂序,超時時間1秒,最大容量2,超出容量觸發(fā)反壓
        DataStream<Tuple2<Order, String>> unorderedResult = AsyncDataStream
                .unorderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2)
                .setParallelism(1);
        unorderedResult.print("unorderedResult");
        
        // redis的操作命令及數據
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hgetall AsyncReadUserById_Redis
        // 1) "1001"
        // 2) "1001,alan,18,20,alan.chan.chn@163.com"
        // 3) "1002"
        // 4) "1002,alanchan,19,25,alan.chan.chn@163.com"
        // 5) "1003"
        // 6) "1003,alanchanchn,20,30,alan.chan.chn@163.com"
        // 7) "1004"
        // 8) "1004,alan_chan,27,20,alan.chan.chn@163.com"
        // 9) "1005"
        // 10) "1005,alan_chan_chn,36,10,alan.chan.chn@163.com"
        
        // 輸入數據
        // 13,1002,811
        // 14,1004,834
        // 15,1005,975

        // 控制臺輸出數據
        // 輸入參數input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
        // result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
        // 輸入參數input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
        // unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
        // result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
        // 輸入參數input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0)
        // unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
        // 輸入參數input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
        // result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
        // 輸入參數input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
        // unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)

        env.execute("TestJoinDimFromAsyncDataStreamDemo");
    }

}

以上,本文介紹了Flink 通過異步IO的方式訪問第三方數據源,介紹了2個示例,即通過緩存和redis的方式。

本專題分為以下幾篇文章:
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-初始化的靜態(tài)數據
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-維表來源于第三方數據源
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-通過廣播將維表數據傳遞到下游
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-通過Temporal table實現維表數據join
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-完整版(1)
【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-完整版(2)

到了這里,關于【flink番外篇】15、Flink維表實戰(zhàn)之6種實現方式-維表來源于第三方數據源的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

    【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年01月19日
    瀏覽(39)
  • 【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)

    【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月05日
    瀏覽(36)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月01日
    瀏覽(26)
  • 【flink番外篇】13、Broadcast State 模式示例(完整版)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年01月17日
    瀏覽(26)
  • 【flink番外篇】16、DataStream 和 Table 相互轉換示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年01月17日
    瀏覽(27)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年01月24日
    瀏覽(19)
  • 【flink番外篇】21、Flink 通過SQL client 和 table api注冊catalog示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月21日
    瀏覽(26)
  • 【flink番外篇】13、Broadcast State 模式示例-簡單模式匹配(1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年01月19日
    瀏覽(40)
  • 【flink番外篇】19、Datastream數據類型到Table schema映射示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年01月21日
    瀏覽(30)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 時態(tài)表的join(java版本)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 3、

    2024年02月02日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包