国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink重溫筆記(二):Flink 流批一體 API 開發(fā)——Source 數(shù)據(jù)源操作

這篇具有很好參考價(jià)值的文章主要介紹了flink重溫筆記(二):Flink 流批一體 API 開發(fā)——Source 數(shù)據(jù)源操作。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Flink學(xué)習(xí)筆記

前言:今天是第二天啦!開始學(xué)習(xí) Flink 流批一體化開發(fā)知識點(diǎn),重點(diǎn)學(xué)習(xí)了各類數(shù)據(jù)源的導(dǎo)入操作,我發(fā)現(xiàn)學(xué)習(xí)編程需要分類記憶,一次一次地猜想 api 作用,然后通過敲代碼印證自己的想法,以此理解知識點(diǎn),加深對api的理解和應(yīng)用。
Tips:我覺得學(xué)習(xí) Flink 還是挺有意思的,雖然學(xué)習(xí)進(jìn)度有點(diǎn)慢,但是數(shù)據(jù)源已經(jīng)理解清楚了,我相信接下來一切會(huì)越來越好的!

二、Flink 流批一體 API 開發(fā)

1. 輸入數(shù)據(jù)集 Data Source

1.1 預(yù)定義 Source
1.1.1 基于本地集合的 Source
  • (1) env.fromElements()
# 兩種輸入類型,一種是元素,一種是元組
DataStreamSource<Object> ds1 = env.fromElements("hadoop","spark", "spark", "flink");

List<Tuple2<String,Long>> tuple2List  = new ArrayList<>();
tuple2List.add(Tuple2.of("hadoop",1L));
tuple2List.add(Tuple2.of("spark", 2L));
tuple2List.add(Tuple2.of("flink", 3L));
DataStreamSource<List<Tuple2<String, Long>>> ds2 = env.fromElements(tuple2List);

# 輸出-1
6> spark
4> hadoop
5> spark
7> flink

# 輸出-2
6> [(hadoop,1), (spark,2), (flink,3)]
  • (2) env.fromCollection()
# 傳入列表
DataStreamSource<String> ds3 = env.fromCollection(Arrays.asList("spark", "flink", "hadoop"));

# 輸出-3
8> hadoop
6> spark
7> flink

# fromParallelCollection 并行度隊(duì)列(0-10閉區(qū)間)
DataStreamSource<Long> parallelCollection  = env.fromParallelCollection(
                new NumberSequenceIterator(0L, 10L),
                TypeInformation.of(Long.TYPE)
        ).setParallelism(3);
# 亂序輸出 -parallelCollection

8> 8
2> 10
8> 7
6> 3
6> 5
3> 0
7> 6
1> 9
5> 2
5> 4
4> 1
  • (3) env.generateSequence()
# 傳入隊(duì)列(左開右閉區(qū)間)
DataStreamSource<Long> ds4 = env.generateSequence(1, 10);

# 輸出 -4
8> 8
3> 3
2> 2
5> 5
1> 1
1> 9
7> 7
6> 6
4> 4
2> 10
  • (4) env.fromSequence()
# 傳入隊(duì)列(左開右閉區(qū)間)
DataStreamSource<Long> ds5 = env.fromSequence(1, 10);

# 輸出 -5
1> 8
7> 6
6> 10
2> 5
3> 1
3> 2
8> 7
4> 9
5> 3
5> 4
1.1.2 基于文件的 Source
  • (1) 批的方式讀取文本文件:env.readTextFile(path)
package cn.itcast.day02.source;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-12 23:47:53
 * @description TODO:批的方式讀取文件
 */
public class BatchFromFile {
    public static void main(String[] args) throws Exception {
        // 配置端口號信息
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",8081);
        // 初始化 UI 環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        // 讀取數(shù)據(jù)源
        String path = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt";
        DataStreamSource<String> lines = env.readTextFile(path);
        // 數(shù)據(jù)源并行度
        int parallelism = lines.getParallelism();
        System.out.println("ReadTextFileDemo創(chuàng)建的DataStream的并行度為:" + parallelism);
        lines.print();
        env.execute();
    }
}
  • (2) 流的方式讀取文本文件:env.readFile()
    • 細(xì)節(jié)點(diǎn):流式處理 PROCESS_CONTINUOUSLY 時(shí),文件狀態(tài)改變才能觸發(fā)重新打印一次
package cn.itcast.day02.source;

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;

/**
 * @author lql
 * @time 2024-02-13 15:34:11
 * @description TODO:流的方式讀取數(shù)據(jù)源,無限流
 */
public class StreamFromFile {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        String path = "./data/input/wordcount.txt";

        // new TextInputFormat(null),文本輸入編碼格式,null表示默認(rèn)為utf-8編碼
        // FileProcessingMode.PROCESS_ONCE 只處理一次
        // 2000毫秒表示間隔處理時(shí)間
        DataStreamSource<String> lines1 = env.readFile(new TextInputFormat(null), path,
                FileProcessingMode.PROCESS_ONCE, 2000
        );
        // FileProcessingMode.PROCESS_CONTINUOUSLY 永續(xù)處理,不會(huì)停止
        DataStreamSource<String> lines2 = env.readFile(new TextInputFormat(null), path,
                FileProcessingMode.PROCESS_CONTINUOUSLY, 2000
        );

        // 查看并行度
        System.out.println("lines1的并行度:"+lines1.getParallelism());
        System.out.println("lines2的并行度:"+lines2.getParallelism());

        //lines1.print();
        lines2.print();
        env.execute();
    }
}
1.1.3 基于 Socket 的 Source
  • 現(xiàn)象:socket 的并行度是 1(單并行度數(shù)據(jù)源)
  • 細(xì)節(jié):在虛擬機(jī)上用 nc -lk 8888 啟動(dòng) socket 服務(wù)端
package cn.itcast.day02.source;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author lql
 * @time 2024-02-13 16:00:47
 * @description TODO:基于socket的數(shù)據(jù)源
 */
public class StreamSocketSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism0 = env.getParallelism();
        System.out.println("執(zhí)行環(huán)境默認(rèn)的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("192.168.88.161", 8888);
        int parallelism1 = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism1);
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        int parallelism2 = words.getParallelism();
        System.out.println("調(diào)用完FlatMap后DataStream的并行度:" + parallelism2);
        words.print();
        env.execute();
    }
}
1.2 自定義 Source
1.2.1 基于隨機(jī)生成DataSource
  • (1) 自定義實(shí)現(xiàn) SourceFunction 接口
    • 例子:自定義數(shù)據(jù)源, 每1秒鐘隨機(jī)生成一條訂單信息(訂單ID、用戶ID、訂單金額、時(shí)間戳)
    • 要求: 隨機(jī)生成訂單ID(UUID),用戶ID(0-2),訂單金額(0-100),時(shí)間戳為當(dāng)前系統(tǒng)時(shí)間
package cn.itcast.day02.source.custom;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * @author lql
 * @time 2024-02-13 16:21:31
 * @description TODO
 */
public class CustomerSourceWithoutParallelDemo {

    /**
     * 自定義 java Bean 類
     *     @Data:自動(dòng)為類生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
     *     @AllArgsConstructor:自動(dòng)生成一個(gè)包含所有參數(shù)的構(gòu)造函數(shù)。
     *     @NoArgsConstructor:自動(dòng)生成一個(gè)無參構(gòu)造函數(shù)。
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order{
        // 訂單
        private String id;
        // 用戶 ID
        private String userId;
        // 訂單金額
        private int money;
        // 時(shí)間戳
        private Long timestamp;
    }

    /**
     * 主函數(shù)
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //todo 1)獲取flink流處理的運(yùn)行環(huán)境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        System.out.println("初始化環(huán)境的并行度:"+ env.getParallelism());

        // todo 2) 接入自定義數(shù)據(jù)源
        DataStreamSource<Order> streamSource = env.addSource(new MySource());
        System.out.println("streamSource并行度: " + streamSource.getParallelism());
        
        // todo 3) 打印輸出
        streamSource.printToErr();
        env.execute();
    }
    
    /**
     * 自定義數(shù)據(jù)源,每秒鐘生成一個(gè)訂單信息
     */
    private static class MySource implements SourceFunction<Order> {
        // 定義循環(huán)生成數(shù)據(jù)的標(biāo)記
        private boolean isRunning = true;
        /**
         * 核心方法:生成數(shù)據(jù)
         */
        @Override
        public void run(SourceContext<Order> sourceContext) throws Exception {
            Random random = new Random();
            while (isRunning){
                // 訂單ID
                String orderID = UUID.randomUUID().toString();
                // 用戶 Id
                String userID = String.valueOf(random.nextInt(3));
                // 訂單金額
                int money = random.nextInt(1000);
                // 時(shí)間
                long time = System.currentTimeMillis();
                // 返回?cái)?shù)據(jù)
                sourceContext.collect(new Order(orderID, userID, money, time));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

結(jié)果:默認(rèn)運(yùn)行環(huán)境的并行度:8, 自定義streamSource的并行度為:1

總結(jié):文章來源地址http://www.zghlxwxcb.cn/news/detail-826946.html

  • 1- env.addSource(new MySource()),自定義數(shù)據(jù)源 [私有靜態(tài)方法]

    • new 一個(gè) 實(shí)現(xiàn)(implements) SourceFunction 接口,并重寫核心方法
  • 2- 認(rèn)識了 java bean 類,@Data,@NoArgsConstructor,@AllArgsConstructor 的作用

  • 3- UUID 這個(gè)工具類可以隨機(jī)生成 id,隨機(jī)數(shù)使用需要先 new 一個(gè),random.nextInt() 是左閉右開

  • 4- String.valuesOf()是可以生成字符串類型,while 循環(huán)需要有 boolean 標(biāo)記

  • 5- collect()可以返回對象數(shù)據(jù)

  • (2) 實(shí)現(xiàn)ParallelSourceFunction創(chuàng)建可并行Source

DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);

# 上述非rich的自定義mySource數(shù)據(jù)源不支持多個(gè)并行度
  • (3) 實(shí)現(xiàn)RichParallelSourceFunction:創(chuàng)建并行并帶有Rich功能的Source
package cn.itcast.day02.source.custom;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * @author lql
 * @time 2024-02-13 16:58:49
 * @description TODO:多并行度的自定義數(shù)據(jù)源
 */
public class RichParallelismDemo {
    /**
     * 自定義 java Bean 類
     *
     * @Data:自動(dòng)為類生成 getter、setter 方法、toString 方法、equals 方法和 hashCode 方法。
     * @AllArgsConstructor:自動(dòng)生成一個(gè)包含所有參數(shù)的構(gòu)造函數(shù)。
     * @NoArgsConstructor:自動(dòng)生成一個(gè)無參構(gòu)造函數(shù)。
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        // 訂單
        private String id;
        // 用戶 ID
        private String userId;
        // 訂單金額
        private int money;
        // 時(shí)間戳
        private Long timestamp;
    }

    /**
     * 主函數(shù)
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //todo 1)獲取flink流處理的運(yùn)行環(huán)境
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        System.out.println("初始化環(huán)境的并行度:" + env.getParallelism());

        // todo 2) 接入自定義數(shù)據(jù)源
        DataStreamSource<Order> streamSource = env.addSource(new MySource());
        streamSource = streamSource;
        System.out.println("streamSource并行度: " + streamSource.getParallelism());

        // todo 3) 打印輸出
        streamSource.printToErr();
        env.execute();

    }

    /**
     * 自定義數(shù)據(jù)源,每秒鐘生成一個(gè)訂單信息
     */
    private static class MySource extends RichParallelSourceFunction<Order> {
        // 定義循環(huán)生成數(shù)據(jù)的標(biāo)記
        private boolean isRunning = true;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public void cancel() {

        }

        @Override
        public void run(SourceContext<Order> sourceContext) throws Exception {
            Random random = new Random();
            while (isRunning) {
                // 訂單ID
                String orderID = UUID.randomUUID().toString();
                // 用戶 Id
                String userID = String.valueOf(random.nextInt(3));
                // 訂單金額
                int money = random.nextInt(1000);
                // 時(shí)間
                long time = System.currentTimeMillis();
                // 返回?cái)?shù)據(jù)
                sourceContext.collect(new Order(orderID, userID, money, time));

            }
        }
    }
}

結(jié)果:自定義RichParallelSourceFunction支持多個(gè)并行度

總結(jié):繼承 RichParallelSourceFunction 方法,需要重寫方法 open 和 close !

1.2.2 基于 MySQL 的 Source 操作
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壯');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');

SET FOREIGN_KEY_CHECKS = 1;
package cn.itcast.day02.source.custom;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;

/**
 * @author lql
 * @time 2024-02-13 17:14:06
 * @description TODO:自定義 mysql 數(shù)據(jù)源
 */
public class MysqlSource {
    public static void main(String[] args) throws Exception {
        // TODO 1: 獲取 flink 流處理環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2: 接入自定義數(shù)據(jù)源
        DataStreamSource<UserInfo> streamSource = env.addSource(new MysqlSourceFunction());
        System.out.println("MysqlSourceFunction的并行度為:"+streamSource.getParallelism());

        // todo 3) 打印輸出
        streamSource.print();

        // todo 4) 啟動(dòng)運(yùn)行作業(yè)
        env.execute();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserInfo{
        private int id;
        private String username;
        private String password;
        private String name;
    }

    /**
     * 自定義數(shù)據(jù)源:獲取 mysql 數(shù)據(jù)
     */
    private static class MysqlSourceFunction extends RichParallelSourceFunction<UserInfo> {
        // 定義 mysql 的連接對象
        private Connection connection = null;

        // 定義 mysql statement 對象
        private PreparedStatement statement = null;

        /**
         * 實(shí)例化的時(shí)候會(huì)被執(zhí)行一次,多個(gè)并行度會(huì)執(zhí)行多次,因?yàn)橛卸鄠€(gè)實(shí)例
         * 一般由于資源的初始化操作
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 注冊驅(qū)動(dòng)
            Class.forName("com.mysql.jdbc.Driver");
            // 實(shí)例化 mysql 的連接對象
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root");
            // 實(shí)例化 statement 對象
            statement = connection.prepareStatement("select * from test.user");
        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public void run(SourceContext<UserInfo> sourceContext) throws Exception {
            while(true){
                ResultSet resultSet = statement.executeQuery();
                while(resultSet.next()) {
                    int id = resultSet.getInt("id");
                    String username = resultSet.getString("username");
                    String password = resultSet.getString("password");
                    String name = resultSet.getString("name");

                    sourceContext.collect(new UserInfo(id,username,password,name));
                }
                resultSet.close();
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {

        }
    }
}

結(jié)果:mysql 的自定義 source,可以多并行度

總結(jié):

  • 1- java Bean 類,給 mysql 字段名定義用
  • 2- 初始化 mysql 連接對象 connection 和 statement 記為null
  • 3- 重寫 open 驅(qū)動(dòng)方法:
    • 注冊 mysql 驅(qū)動(dòng):Class.forName(“com.mysql.jdbc.Driver”)
    • 實(shí)例化連接對象 connection:DriverManager.getConnection()
    • 實(shí)例化 statement:connection.prepareStatement(),這里放置 sql 查詢語句
  • 4- 重寫 run 核心方法:
    • 雙重循環(huán),第一層:結(jié)果集關(guān)閉和停頓間隔,第二層:statement.executeQuery()獲取結(jié)果集,字段類型和內(nèi)容獲取
    • 獲取完字段后,需要collect(new 實(shí)體類(字段集))
  • 5- 睡眠時(shí)間:TimeUnit.SECONDS.sleep()

到了這里,關(guān)于flink重溫筆記(二):Flink 流批一體 API 開發(fā)——Source 數(shù)據(jù)源操作的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 4、介紹Flink的流批一體、transformations的18種算子詳細(xì)介紹、Flink與Kafka的source、sink介紹

    4、介紹Flink的流批一體、transformations的18種算子詳細(xì)介紹、Flink與Kafka的source、sink介紹

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月13日
    瀏覽(60)
  • Flink流批一體計(jì)算(10):PyFlink Tabel API

    簡述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它構(gòu)建可擴(kuò)展的批處理和流處理任務(wù),例如實(shí)時(shí)數(shù)據(jù)處理管道、大規(guī)模探索性數(shù)據(jù)分析、機(jī)器學(xué)習(xí)( ML )管道和 ETL 處理。 如果你對 Python 和 Pandas 等庫已經(jīng)比較熟悉,那么 PyFlink 可以讓你更輕松地利用 Flink 生態(tài)系統(tǒng)的全部功

    2024年02月11日
    瀏覽(27)
  • Flink流批一體計(jì)算(16):PyFlink DataStream API

    Flink流批一體計(jì)算(16):PyFlink DataStream API

    目錄 概述 Pipeline Dataflow 代碼示例WorldCount.py 執(zhí)行腳本W(wǎng)orldCount.py 概述 Apache Flink 提供了 DataStream API,用于構(gòu)建健壯的、有狀態(tài)的流式應(yīng)用程序。它提供了對狀態(tài)和時(shí)間細(xì)粒度控制,從而允許實(shí)現(xiàn)高級事件驅(qū)動(dòng)系統(tǒng)。 用戶實(shí)現(xiàn)的Flink程序是由Stream和Transformation這兩個(gè)基本構(gòu)建塊組

    2024年02月11日
    瀏覽(25)
  • Flink流批一體計(jì)算(20):DataStream API和Table API互轉(zhuǎn)

    目錄 舉個(gè)例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個(gè)例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴(kuò)展了TableEnvironment。 下面代碼演示兩種API如何互轉(zhuǎn) TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項(xiàng)。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計(jì)算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目錄 StreamExecutionEnvironment Watermark watermark策略簡介 使用 Watermark 策略 內(nèi)置水印生成器 處理空閑數(shù)據(jù)源 算子處理 Watermark 的方式 創(chuàng)建DataStream的方式 通過list對象創(chuàng)建 ??????使用DataStream connectors創(chuàng)建 使用Table SQL connectors創(chuàng)建 StreamExecutionEnvironment 編寫一個(gè) Flink Python DataSt

    2024年02月11日
    瀏覽(54)
  • Flink流批一體計(jì)算(12):PyFlink Tabel API之構(gòu)建作業(yè)

    目錄 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通過 DDL 語句來注冊源表和結(jié)果表 2. 創(chuàng)建一個(gè)作業(yè) 3. 提交作業(yè)Submitting PyFlink Jobs 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    瀏覽(21)
  • Flink流批一體計(jì)算(11):PyFlink Tabel API之TableEnvironment

    目錄 概述 設(shè)置重啟策略 什么是flink的重啟策略(Restartstrategy) flink的重啟策略(Restartstrategy)實(shí)戰(zhàn) flink的4種重啟策略 FixedDelayRestartstrategy(固定延時(shí)重啟策略) FailureRateRestartstrategy(故障率重啟策略) NoRestartstrategy(不重啟策略) 配置State Backends 以及 Checkpointing Checkpoint 啟用和配置

    2024年02月13日
    瀏覽(46)
  • Flink流批一體計(jì)算(19):PyFlink DataStream API之State

    目錄 keyed state Keyed DataStream 使用 Keyed State 實(shí)現(xiàn)了一個(gè)簡單的計(jì)數(shù)窗口 狀態(tài)有效期 (TTL) 過期數(shù)據(jù)的清理 全量快照時(shí)進(jìn)行清理 增量數(shù)據(jù)清理 在 RocksDB 壓縮時(shí)清理 Operator State算子狀態(tài) Broadcast State廣播狀態(tài) keyed state Keyed DataStream 使用 keyed state,首先需要為DataStream指定 key(主鍵)

    2024年02月10日
    瀏覽(43)
  • Flink流批一體計(jì)算(14):PyFlink Tabel API之SQL查詢

    舉個(gè)例子 查詢 source 表,同時(shí)執(zhí)行計(jì)算 Table API 查詢 Table 對象有許多方法,可以用于進(jìn)行關(guān)系操作。 這些方法返回新的 Table 對象,表示對輸入 Table 應(yīng)用關(guān)系操作之后的結(jié)果。 這些關(guān)系操作可以由多個(gè)方法調(diào)用組成,例如 table.group_by(...).select(...)。 Table API 文檔描述了流和批

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計(jì)算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 創(chuàng)建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用來: ·創(chuàng)建 Table ·將 Table 注冊成臨時(shí)表 ·執(zhí)行 SQL 查詢 ·注冊用戶自定義的 (標(biāo)量,表值,或者聚合) 函數(shù) ·配置作業(yè) ·管理 Python 依賴 ·提交作業(yè)執(zhí)行 創(chuàng)建 source 表 創(chuàng)建 sink

    2024年02月12日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包