国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

4.2、Flink任務(wù)怎樣讀取文件中的數(shù)據(jù)

這篇具有很好參考價值的文章主要介紹了4.2、Flink任務(wù)怎樣讀取文件中的數(shù)據(jù)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

1、前言

2、readTextFile(已過時,不推薦使用)

3、readFile(已過時,不推薦使用)

4、fromSource(FileSource) 推薦使用


1、前言

思考: 讀取文件時可以設(shè)置哪些規(guī)則呢?

?????????1. 文件的格式(txt、csv、二進(jìn)制...)????????

? ? ? ? ?2. 文件的分隔符(按\n 分割)

? ? ? ? ?3. 是否需要監(jiān)控文件變化(一次讀取、持續(xù)讀取)

基于以上規(guī)則,F(xiàn)link為我們提供了非常靈活的 讀取文件的方法


2、readTextFile(已過時,不推薦使用)

語法說明:

定義:
    def readTextFile(filePath: String): DataStream[String]
    def readTextFile(filePath: String, charsetName: String)

功能:
    1.讀取文本格式的文件
    2.按行讀取(\n為分隔符),每行數(shù)據(jù)被封裝為 DataStream 的一個元素
    3.可以指定字符集(默認(rèn)為UDF-8)
    4.文件只會讀取一次

源碼分析:
    public DataStreamSource<String> readTextFile(String filePath, String charsetName) {

        // 初始化 TextInputFormat對象
        TextInputFormat format = new TextInputFormat(new Path(filePath));  
        // 指定路徑過濾器(使用默認(rèn)過濾器)
        format.setFilesFilter(FilePathFilter.createDefaultFilter());  
        // 指定Flink中的數(shù)據(jù)類型    
        TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; 
        // 指定字符集
        format.setCharsetName(charsetName);     
                                   
        // 調(diào)用 readFile 方法
        return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); 
    }

代碼示例:

    public static void readTextFile() throws Exception {
        /*
         * TODO 功能說明
         *   readTextFile(path) - 讀取文本文件(一次讀取),例如遵守 TextInputFormat 規(guī)范的文件,逐行讀取并將它們作為字符串返回。
         * */
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.將文本文件作為數(shù)據(jù)源
        env.readTextFile("data/1.txt").setParallelism(4).print();

        // 3.觸發(fā)程序執(zhí)行
        env.execute();
    }

3、readFile(已過時,不推薦使用)

語法說明:

定義:
    def readFile[T: TypeInformation](
        inputFormat: FileInputFormat[T],
        filePath: String,
        watchType: FileProcessingMode,
        interval: Long): DataStream[T] = {
      val typeInfo = implicitly[TypeInformation[T]] // 隱私轉(zhuǎn)換(將java 數(shù)據(jù)類型 轉(zhuǎn)換為 Flink數(shù)據(jù)類型)
      asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))
    }

參數(shù):
    inputFormat : 指定 FileInputFormat 實現(xiàn)類(根據(jù)文件類型 選擇相適應(yīng)的實例)
    filePath    : 指定 文件路徑
    watchType   : 指定 讀取模式(提供了2個枚舉值)
                       PROCESS_ONCE :只讀取一次
                       PROCESS_CONTINUOUSLY :按照指定周期掃描文件
    interval    : 指定 掃描文件的周期(單位為毫秒)

功能:
    按照 指定的 文件格式 和 讀取方式 讀取數(shù)據(jù)
4.2、Flink任務(wù)怎樣讀取文件中的數(shù)據(jù),# Flink API 使用技巧,flink,python,前端
FileInputFormat 的實現(xiàn)類

代碼示例:文章來源地址http://www.zghlxwxcb.cn/news/detail-635748.html

    public static void readFile() throws Exception {
        /*
         * TODO 功能說明
         *    readFile(fileInputFormat, path) - 按照指定的文件輸入格式讀取(一次)文件。
         *    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
         *       按照指定的文件輸入格式讀?。ǔ掷m(xù)的讀?。┪募?         * */

        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.將文本文件作為數(shù)據(jù)源
        String filePath = "data/1.txt";

        TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));
        textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定過濾器
        textInputFormat.setCharsetName("UTF-8"); // 指定編碼格式

        /*
         * readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)
         * 參數(shù)說明:
         *      @inputFormat : 指定文件輸入格式
         *      @filePath    : 指定文件路徑
         *      @watchType   : 指定監(jiān)控類型,提供了兩種讀取策略
         *            PROCESS_ONCE : 只讀取一次
         *            PROCESS_CONTINUOUSLY :持續(xù)讀取,監(jiān)控新增數(shù)據(jù)
         *      @interval : 指定連續(xù)掃描文件的周期(毫秒)
         * 重點提示:
         *      1.如果watchType設(shè)置為PROCESS_CONTINUOUSLY時,當(dāng)一個文件被修改時,將會導(dǎo)致重新讀取該
         *           文件的全部內(nèi)容,這將會打破`精確一次`的語義
         * */
        env.readFile(
                textInputFormat
                , filePath
                , FileProcessingMode.PROCESS_CONTINUOUSLY
                , 1000
        ).print();

        // 3.觸發(fā)程序執(zhí)行
        env.execute();
    }

4、fromSource(FileSource) 推薦使用

    public static void FileSource() throws Exception {
        // 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.將文本文件作為數(shù)據(jù)源
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat()
                , new Path("data/1.txt")).build();

        env.fromSource(fileSource
                , WatermarkStrategy.noWatermarks()
                , "read fileSource"
        ).print();

        // 3.觸發(fā)程序執(zhí)行
        env.execute();
    }

到了這里,關(guān)于4.2、Flink任務(wù)怎樣讀取文件中的數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 讀取JSON文件 如何在Unity中讀取Json文件中的數(shù)據(jù)

    讀取JSON文件 如何在Unity中讀取Json文件中的數(shù)據(jù)

    Josn是一種輕量級的數(shù)據(jù)交換格式,JSON能夠描述四種簡單的類型(字符串、數(shù)字、布爾值及null)和兩種結(jié)構(gòu)化類型(對象及數(shù)組),在Unity里經(jīng)常用Json來處理大量的字符串,容易解析,效率非常快。 基本結(jié)構(gòu) 1、語法 數(shù)據(jù)存在鍵值對中 數(shù)據(jù)由逗號分隔 花括號保存對象 方括號保存

    2024年02月15日
    瀏覽(23)
  • Flink的API分層、架構(gòu)與組件原理、并行度、任務(wù)執(zhí)行計劃

    Flink的API分層、架構(gòu)與組件原理、并行度、任務(wù)執(zhí)行計劃

    ????????Apache Flink的API分為四個層次,每個層次都提供不同的抽象和功能,以滿足不同場景下的數(shù)據(jù)處理需求。下面是這四個層次的具體介紹: CEP API: Flink API 最底層的抽象為有狀態(tài)實時流處理。其抽象實現(xiàn)是Process Function,并且Process Function被 ?框架集成到了DataStream API中

    2024年02月05日
    瀏覽(21)
  • python實現(xiàn)讀取文件中的視頻數(shù)據(jù)并實時展示

    要實現(xiàn)讀取文件中的視頻數(shù)據(jù)并實時展示,可以使用OpenCV庫。以下是一個簡單的示例代碼: 在這個示例中,我們首先使用`cv2.VideoCapture()`函數(shù)打開視頻文件。然后,我們使用一個無限循環(huán)來逐幀讀取視頻,并在窗口中顯示當(dāng)前幀。最后,我們釋放資源并關(guān)閉窗口。注意,在循

    2024年02月12日
    瀏覽(20)
  • 用Python的pandas讀取excel文件中的數(shù)據(jù)

    用Python的pandas讀取excel文件中的數(shù)據(jù)

    hello呀!各位鐵子們大家好呀,今天呢來和大家聊一聊用Python的pandas讀取excel文件中的數(shù)據(jù)。 使用pandas的 read_excel() 方法,可通過文件路徑直接讀取。注意到,在一個excel文件中有多個sheet,因此,對excel文件的讀取實際上是讀取指定文件、并同時指定sheet下的數(shù)據(jù)??梢砸淮巫x

    2024年02月02日
    瀏覽(89)
  • 如何使用pandas讀取csv文件中的某一列數(shù)據(jù)

    使用pandas讀取csv文件中的某一列數(shù)據(jù),可以這樣做: 先導(dǎo)入pandas模塊: import pandas as pd 使用 pd.read_csv 函數(shù)讀取csv文件: df = pd.read_csv(\\\"文件名.csv\\\") 使用 df[\\\"列名\\\"] 讀取某一列數(shù)據(jù): column = df[\\\"列名\\\"] 例如,如果你有一個csv文件叫做 example.csv ,并且有一列叫做 age ,你可以這樣

    2024年02月13日
    瀏覽(112)
  • html5提供的FileReader是一種異步文件讀取文件中的數(shù)據(jù)

    前言:FileReader是一種異步文件讀取機制,結(jié)合input:file可以很方便的讀取本地文件。 input:file 在介紹FileReader之前,先簡單介紹input的file類型。 input type=\\\"file\\\" id=\\\"file\\\" input的file類型會渲染為一個按鈕和一段文字。點擊按鈕可打開文件選擇窗口,文字表示對文件的描述(大部分情

    2024年02月11日
    瀏覽(23)
  • 用bat 命令 修改sql文件中的數(shù)據(jù)庫名字 新的名字通過讀取配置文件中的字段獲取

    在批處理腳本中,如果新數(shù)據(jù)庫名存儲在配置文件(比如config.ini)中的某個字段內(nèi),可以按照以下步驟進(jìn)行: 假設(shè)你的配置文件內(nèi)容如下: 要讀取這個值并用于替換.sql文件中的舊數(shù)據(jù)庫名,請使用以下批處理腳本: 在這個腳本中,首先通過 findstr 和 for /f 命令組合讀取配置

    2024年02月02日
    瀏覽(25)
  • POI:從Excel文件中讀取數(shù)據(jù),向Excel文件中寫入數(shù)據(jù),將Excel表格中的數(shù)據(jù)插入數(shù)據(jù)庫,將數(shù)據(jù)庫中的數(shù)據(jù)添加到Excel表

    POI:從Excel文件中讀取數(shù)據(jù),向Excel文件中寫入數(shù)據(jù),將Excel表格中的數(shù)據(jù)插入數(shù)據(jù)庫,將數(shù)據(jù)庫中的數(shù)據(jù)添加到Excel表

    POI是Apache軟件基金會用Java編寫的免費開源的跨平臺的 Java API,Apache POI提供API給Java程序?qū)icrosoft Office格式檔案讀和寫的功能。POI為“Poor Obfuscation Implementation”的首字母縮寫,意為“可憐的模糊實現(xiàn)”。 所以POI的主要功能是可以用Java操作Microsoft Office的相關(guān)文件,但是一般我

    2024年02月10日
    瀏覽(26)
  • Flink(三)flink重要概念(api分層、角色、執(zhí)行流程、執(zhí)行圖和編程模型)及dataset、datastream詳細(xì)示例入門和提交任務(wù)至on yarn運行

    Flink(三)flink重要概念(api分層、角色、執(zhí)行流程、執(zhí)行圖和編程模型)及dataset、datastream詳細(xì)示例入門和提交任務(wù)至on yarn運行

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月16日
    瀏覽(27)
  • 【Flink】 Flink實時讀取mysql數(shù)據(jù)

    準(zhǔn)備 你需要將這兩個依賴添加到 pom.xml 中 mysql mysql-connector-java 8.0.0 讀取 kafka 數(shù)據(jù) 這里我依舊用的以前的 student 類,自己本地起了 kafka 然后造一些測試數(shù)據(jù),這里我們測試發(fā)送一條數(shù)據(jù)則 sleep 10s,意味著往 kafka 中一分鐘發(fā) 6 條數(shù)據(jù)。 package com.zhisheng.connectors.mysql.utils; impo

    2024年02月03日
    瀏覽(15)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包