国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink重溫筆記(六):Flink 流批一體 API 開發(fā)—— 數(shù)據(jù)輸出 sink

這篇具有很好參考價值的文章主要介紹了flink重溫筆記(六):Flink 流批一體 API 開發(fā)—— 數(shù)據(jù)輸出 sink。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink學習筆記

前言:今天是學習 flink 的第七天啦!學習了 flink 中 sink(數(shù)據(jù)槽) 部分知識點,這一部分只要是解決數(shù)據(jù)處理之后,數(shù)據(jù)到哪里去的問題,我覺得 flink 知識點雖然比較難理解,但是代碼跑通后,邏輯還是比較有趣的!

Tips:毛爺爺說過:“宜將剩勇追窮寇,不可沽名學霸王!”明天周日除了復習前面知識點之外,也要繼續(xù)努力學習接下來的知識點,繼續(xù)加油!

二、Flink 流批一體 API 開發(fā)

4. 數(shù)據(jù)輸出 Sink

4.1 print 打印

打印是最簡單的一個Sink,通常是用來做實驗和測試時使用。

實例:socket 數(shù)據(jù)源,查看進程編號最終輸出 sink 之 print 打印

package cn.itcast.day06.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-17 22:27:48
 * @description TODO:print
 */
public class PrintSinkDemo {
    public static void main(String[] args) throws Exception {
        //local模式默認的并行度是當前機器的邏輯核的數(shù)量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();
        System.out.println("執(zhí)行環(huán)境默認的并行度:" + parallelism0);
        // socket 數(shù)據(jù)源
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // 獲取 lines 數(shù)據(jù)源并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);

        lines.print();
        lines.addSink(new MyPrintSink()).name("my-print-sink");
        env.execute();
    }

    private static class MyPrintSink extends RichSinkFunction<String> {
        // 這一處定義很重要,不然 indexOfThisSubtask 只能在一個方法中使用!
        private int indexOfThisSubtask;
        @Override
        public void open(Configuration parameters) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(indexOfThisSubtask + 1 + "> " + value);
        }
    }
}

結(jié)果:

執(zhí)行環(huán)境默認的并行度:8
SocketSource的并行度:1
6> hadoop
1> hadoop
1> hadoop
7> hadoop

總結(jié):

  • 打印輸出,也是一種 sink
4.2 writeAsText 以文本格式輸出

該方法是將數(shù)據(jù)以文本格式實時的寫入到指定的目錄中,本質(zhì)上使用的是 TextOutputFormat 格式寫入的。

實例:socket 數(shù)據(jù)源,將數(shù)據(jù)輸出到文本 Text 中

package cn.itcast.day06.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 22:40:48
 * @description TODO:writeAsText
 */
public class WriteSinkDemo {
    public static void main(String[] args) throws Exception {
        //local模式默認的并行度是當前機器的邏輯核的數(shù)量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();
        System.out.println("執(zhí)行環(huán)境默認的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //獲取DataStream的并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);

        lines.writeAsText("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output",FileSystem.WriteMode.OVERWRITE);
        env.execute();

    }
}

結(jié)果:

output 文件夾下出現(xiàn)以數(shù)字命名的文件
內(nèi)容為 socket 數(shù)據(jù)源輸出,加上了 \n 換行符
目錄中的文件名稱是該 Sink 所在 subtask 的 Index + 1

總結(jié):

  • 1- writeAsText 輸出數(shù)據(jù)以小文件方式,文件命名為 subtask 的 Index + 1
  • 2- FileSystem.WriteMode 有兩種,一種是 OVERWRITE,可以覆蓋同名文件,一種是 NO_OVERWRITE,同名文件就報錯
4.3 writeAsCsv 以 csv 格式輸出

該方法是將數(shù)據(jù)以 csv 格式寫入到指定的目錄中,本質(zhì)上使用的是 CsvOutputFormat 格式寫入的。

實例:socket 數(shù)據(jù)源,將數(shù)據(jù)輸出到文本 csv 中

package cn.itcast.day06.sink;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.core.fs.FileSystem;
/**
 * @author lql
 * @time 2024-02-17 22:52:12
 * @description TODO:將DataSet數(shù)據(jù)寫入到csv文件中
 */
public class CsvSink {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //需先建立文件
        String filePath = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\user.csv";
        //添加數(shù)據(jù)
        Tuple7<Integer, String, Integer, Integer, String, String, Long> row = new Tuple7<>(15, "zhangsan", 40, 1, "CN", "2020-09-08 00:00:00", 1599494400000L);

        //轉(zhuǎn)換為dataSet,利用 數(shù)據(jù)源中 fromElements 可以接受 [列表或元組] 的屬性
        DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>> dataSet = (DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>>) env.fromElements(row);

        //將內(nèi)容寫入到File中,如果文件已存在,將會被復蓋
        dataSet.writeAsCsv(filePath,FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        env.execute();
    }
}

結(jié)果:

在指定文件中,生成了 csv 數(shù)據(jù)

總結(jié):

  • 1- 首先需要定義數(shù)據(jù)存放的位置,精確到 .scv
  • 2- 最終需要將并行度設(shè)置為 1,才能生成一個完整的文件
4.4 writeUsingOutputFormat 指定格式輸出

該方法是將數(shù)據(jù)已指定的格式寫入到指定目錄中

package cn.itcast.day06.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 23:03:24
 * @description TODO:將數(shù)據(jù)已指定的格式寫入到指定目錄中
 */
public class writeUsingOutputFormatSink {
    public static void main(String[] args) throws Exception {
        //1:獲取流處理運行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //調(diào)用env的fromElements創(chuàng)建一個非并行的DataStreamSource
        DataStreamSource<String> words = env.fromElements(
                "hadoop","spark","flink","hbase","flink","spark"
        );

        // 對拆分后的單詞,每個單詞記一次數(shù)
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);

        result.writeUsingOutputFormat(new TextOutputFormat<>(new Path("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\wordcount")));
        env.execute();

    }
}

結(jié)果:

在指定目錄下,生成 n(電腦并行度數(shù)量) 個文本文件

總結(jié):

  • 1- writeAsText 和 writeAsCsv 方法底層都是調(diào)用了 writeUsingOutputFormat 方法
  • 2- 這種方法更加靈活
4.5 writeToSocket 輸出到網(wǎng)絡(luò)端口

該方法是將數(shù)據(jù)輸出到指定的Socket網(wǎng)絡(luò)地址端口。

實例:socket 數(shù)據(jù)源,node1:9999 寫數(shù)據(jù)到 node1:8888

package cn.itcast.day06.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-17 23:12:03
 * @description TODO:writeToSocket
 */
public class WriteToSocketDemo {
    public static void main(String[] args) throws Exception {
        //local模式默認的并行度是當前機器的邏輯核的數(shù)量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        int parallelism0 = env.getParallelism();
        System.out.println("執(zhí)行環(huán)境默認的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
        
        //獲取DataStream的并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);
        
        // 第三個參數(shù)是數(shù)據(jù)輸出的序列化格式 SerializationSchema
        lines.writeToSocket("node1",8888,new SimpleStringSchema());
        env.execute();
    }
}

結(jié)果:

node1:8888 實時接收到 node1:9999 寫入的數(shù)據(jù)

總結(jié):

  • 端口號需要提前開啟
4.6 基于本地集合的 Sink

數(shù)據(jù)分類集合輸出

實例:數(shù)據(jù)打印輸出,error 輸出,可以輸出到:Stdout,Stderr,采集為本地集合

package cn.itcast.day06.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 23:18:12
 * @description TODO:數(shù)據(jù)可以輸出到:Stdout,Stderr,采集為本地集合
 */
public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Integer, String>> dataSource = env.fromElements(
                Tuple2.of(1, "zhangsan"),
                Tuple2.of(2, "lisi"),
                Tuple2.of(3, "wangwu"),
                Tuple2.of(4, "zhaoliu")
        );

        //2.sink
        dataSource.print();
        dataSource.printToErr();

        env.execute();
    }
}

結(jié)果:

黑色字體輸出:
6> (3,wangwu)
7> (4,zhaoliu)
4> (1,zhangsan)
5> (2,lisi)

紅色字體輸出:
8> (3,wangwu)
7> (2,lisi)
1> (4,zhaoliu)
6> (1,zhangsan)

總結(jié):文章來源地址http://www.zghlxwxcb.cn/news/detail-833949.html

  • 1- printToErr 可以進行分類輸出
  • 2- 并行度是1 能輸出文件
  • 3- 并行度是n 能輸出文件夾

到了這里,關(guān)于flink重溫筆記(六):Flink 流批一體 API 開發(fā)—— 數(shù)據(jù)輸出 sink的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink流批一體計算(16):PyFlink DataStream API

    Flink流批一體計算(16):PyFlink DataStream API

    目錄 概述 Pipeline Dataflow 代碼示例WorldCount.py 執(zhí)行腳本W(wǎng)orldCount.py 概述 Apache Flink 提供了 DataStream API,用于構(gòu)建健壯的、有狀態(tài)的流式應(yīng)用程序。它提供了對狀態(tài)和時間細粒度控制,從而允許實現(xiàn)高級事件驅(qū)動系統(tǒng)。 用戶實現(xiàn)的Flink程序是由Stream和Transformation這兩個基本構(gòu)建塊組

    2024年02月11日
    瀏覽(25)
  • Flink流批一體計算(10):PyFlink Tabel API

    簡述 PyFlink 是 Apache Flink 的 Python API ,你可以使用它構(gòu)建可擴展的批處理和流處理任務(wù),例如實時數(shù)據(jù)處理管道、大規(guī)模探索性數(shù)據(jù)分析、機器學習( ML )管道和 ETL 處理。 如果你對 Python 和 Pandas 等庫已經(jīng)比較熟悉,那么 PyFlink 可以讓你更輕松地利用 Flink 生態(tài)系統(tǒng)的全部功

    2024年02月11日
    瀏覽(27)
  • Flink流批一體計算(20):DataStream API和Table API互轉(zhuǎn)

    目錄 舉個例子 連接器 下載連接器(connector)和格式(format)jar 包 依賴管理 ?如何使用連接器 舉個例子 StreamExecutionEnvironment 集成了DataStream API,通過額外的函數(shù)擴展了TableEnvironment。 下面代碼演示兩種API如何互轉(zhuǎn) TableEnvironment 將采用StreamExecutionEnvironment所有的配置選項。 建

    2024年02月10日
    瀏覽(24)
  • Flink流批一體計算(17):PyFlink DataStream API之StreamExecutionEnvironment

    目錄 StreamExecutionEnvironment Watermark watermark策略簡介 使用 Watermark 策略 內(nèi)置水印生成器 處理空閑數(shù)據(jù)源 算子處理 Watermark 的方式 創(chuàng)建DataStream的方式 通過list對象創(chuàng)建 ??????使用DataStream connectors創(chuàng)建 使用Table SQL connectors創(chuàng)建 StreamExecutionEnvironment 編寫一個 Flink Python DataSt

    2024年02月11日
    瀏覽(55)
  • Flink流批一體計算(11):PyFlink Tabel API之TableEnvironment

    目錄 概述 設(shè)置重啟策略 什么是flink的重啟策略(Restartstrategy) flink的重啟策略(Restartstrategy)實戰(zhàn) flink的4種重啟策略 FixedDelayRestartstrategy(固定延時重啟策略) FailureRateRestartstrategy(故障率重啟策略) NoRestartstrategy(不重啟策略) 配置State Backends 以及 Checkpointing Checkpoint 啟用和配置

    2024年02月13日
    瀏覽(47)
  • Flink流批一體計算(12):PyFlink Tabel API之構(gòu)建作業(yè)

    目錄 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通過 DDL 語句來注冊源表和結(jié)果表 2. 創(chuàng)建一個作業(yè) 3. 提交作業(yè)Submitting PyFlink Jobs 1.創(chuàng)建源表和結(jié)果表。 創(chuàng)建及注冊表名分別為 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    瀏覽(21)
  • Flink流批一體計算(19):PyFlink DataStream API之State

    目錄 keyed state Keyed DataStream 使用 Keyed State 實現(xiàn)了一個簡單的計數(shù)窗口 狀態(tài)有效期 (TTL) 過期數(shù)據(jù)的清理 全量快照時進行清理 增量數(shù)據(jù)清理 在 RocksDB 壓縮時清理 Operator State算子狀態(tài) Broadcast State廣播狀態(tài) keyed state Keyed DataStream 使用 keyed state,首先需要為DataStream指定 key(主鍵)

    2024年02月10日
    瀏覽(43)
  • Flink流批一體計算(14):PyFlink Tabel API之SQL查詢

    舉個例子 查詢 source 表,同時執(zhí)行計算 Table API 查詢 Table 對象有許多方法,可以用于進行關(guān)系操作。 這些方法返回新的 Table 對象,表示對輸入 Table 應(yīng)用關(guān)系操作之后的結(jié)果。 這些關(guān)系操作可以由多個方法調(diào)用組成,例如 table.group_by(...).select(...)。 Table API 文檔描述了流和批

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(13):PyFlink Tabel API之SQL DDL

    1. TableEnvironment 創(chuàng)建 TableEnvironment TableEnvironment 是 Table API 和 SQL 集成的核心概念。 TableEnvironment 可以用來: ·創(chuàng)建 Table ·將 Table 注冊成臨時表 ·執(zhí)行 SQL 查詢 ·注冊用戶自定義的 (標量,表值,或者聚合) 函數(shù) ·配置作業(yè) ·管理 Python 依賴 ·提交作業(yè)執(zhí)行 創(chuàng)建 source 表 創(chuàng)建 sink

    2024年02月12日
    瀏覽(23)
  • Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    Flink流批一體計算(18):PyFlink DataStream API之計算和Sink

    目錄 1. 在上節(jié)數(shù)據(jù)流上執(zhí)行轉(zhuǎn)換操作,或者使用 sink 將數(shù)據(jù)寫入外部系統(tǒng)。 2. File Sink File Sink Format Types? Row-encoded Formats? Bulk-encoded Formats? 桶分配 滾動策略 3. 如何輸出結(jié)果 Print 集合數(shù)據(jù)到客戶端,execute_and_collect方法將收集數(shù)據(jù)到客戶端內(nèi)存 將結(jié)果發(fā)送到DataStream sink conne

    2024年02月11日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包