国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink將數(shù)據(jù)寫入CSV文件后文件中沒有數(shù)據(jù)

這篇具有很好參考價(jià)值的文章主要介紹了Flink將數(shù)據(jù)寫入CSV文件后文件中沒有數(shù)據(jù)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Flink中有一個(gè)過時(shí)的sink方法:writeAsCsv,這個(gè)方法是將數(shù)據(jù)寫入CSV文件中,有時(shí)候我們會(huì)發(fā)現(xiàn)程序啟動(dòng)后,打開文件查看沒有任何數(shù)據(jù),日志信息中也沒有任何報(bào)錯(cuò),這里我們結(jié)合源碼分析一下這個(gè)原因.

這里先看一下數(shù)據(jù)處理的代碼
代碼中我是使用的自定義數(shù)據(jù)源生產(chǎn)數(shù)據(jù)的方式,為了方便測(cè)試

import lombok.*;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/6/19
 * @Description: 自定義數(shù)據(jù)源測(cè)試
 **/
public class FlinkCustomizeSource {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建流環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 設(shè)置并行度
        env.setParallelism(1); // 這里的并行度設(shè)置為幾就會(huì)生成多少個(gè)csv文件
        // 添加自定義數(shù)據(jù)源
         DataStreamSource<CustomizeBean> dataStreamSource = env.addSource(new customizeSource());
        // 先將數(shù)據(jù)轉(zhuǎn)換成Tuple類型,這樣才能寫入csv中
        SingleOutputStreamOperator<Tuple4<String, Integer, String, String>> tuple4Stream = dataStreamSource.map(
                bean -> Tuple4.of(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit())
        ).returns(new TypeHint<Tuple4<String, Integer, String, String>>() {});
        // 選擇csv類型的sink,模式使用的覆蓋
        tuple4Stream.writeAsCsv("/Users/xxx/data/testData/test.csv", FileSystem.WriteMode.OVERWRITE);
        env.execute();
    }
}

// 自定義數(shù)據(jù)源需要實(shí)現(xiàn)SourceFunction接口,注意這個(gè)接口是單機(jī)的數(shù)據(jù)源,如果是想自定義分布式的數(shù)據(jù)源需要集成RichParallelSourceFunction類
class customizeSource implements SourceFunction<CustomizeBean> {
    int flag;
    // Job執(zhí)行的線程
    @Override
    public void run(SourceContext ctx) throws Exception {
        /*這個(gè)方法里就是具體的數(shù)據(jù)邏輯,實(shí)際內(nèi)容要根據(jù)業(yè)務(wù)需求編寫,這里只是為了演示方便*/
        CustomizeBean customizeBean = new CustomizeBean();
        String[] genders = {"M", "W"};
        String[] hobbits = {"籃球運(yùn)動(dòng)愛好者", "釣魚愛好者", "乒乓球運(yùn)動(dòng)愛好者", "美食愛好者", "羽毛球運(yùn)動(dòng)愛好者", "天文知識(shí)愛好者", "旅游愛好者", "書法愛好者", "非遺文化愛好者", "網(wǎng)吧戰(zhàn)神"};
        while (flag != 100) {
            // 這里自定義的Bean作為數(shù)據(jù)源
            customizeBean.setAge(RandomUtils.nextInt(18, 80)); // 年齡
            customizeBean.setName("A-" + new Random().nextInt()); // 姓名
            customizeBean.setGender(genders[RandomUtils.nextInt(0, genders.length)]); // 性別
            customizeBean.setHobbit(hobbits[RandomUtils.nextInt(0, hobbits.length)]); // 愛好
            // 將數(shù)據(jù)收集
            ctx.collect(customizeBean);
            // 睡眠時(shí)間是為了控制數(shù)據(jù)生產(chǎn)的速度,演示效果更加明顯
            Thread.sleep(1000);
        }
    }

    // Job取消時(shí)就會(huì)調(diào)用cancel方法
    @Override
    public void cancel() {
        // flag為100時(shí)就會(huì)停止程序
        flag = 100;
    }
}

@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
class CustomizeBean{
    private String name;
    private int age;
    private String gender;
    private String hobbit;
}

上面的代碼中我們使用自定義數(shù)據(jù)源的方式(java bean[CustomizeBean]),通過設(shè)置Thread.sleep(1000)可以固定每秒生成一條數(shù)據(jù).這里我們先看一下存儲(chǔ)CSV文件的目錄
flink-csv,FLink,flink,java,大數(shù)據(jù)
通過上圖可以看到程序沒有啟動(dòng)時(shí),目錄是空的,這里我們啟動(dòng)一下程序
日志內(nèi)容如下

[2023-06-19 15:26:37,755]-[INFO] -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader -3206 -org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader.load(StateChangelogStorageLoader.java:98).load(98) | Creating a changelog storage with name 'memory'.
[2023-06-19 15:26:37,766]-[INFO] -org.apache.flink.runtime.taskexecutor.TaskExecutor -3217 -org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:757).submitTask(757) | Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203), deploy into slot with allocation id b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,768]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3219 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from CREATED to DEPLOYING.
[2023-06-19 15:26:37,769]-[INFO] -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl -3220 -org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.markExistingSlotActive(TaskSlotTableImpl.java:388).markExistingSlotActive(388) | Activate slot b691e34573507d585516decbedb36384.
[2023-06-19 15:26:37,773]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3224 -org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:623).doRun(623) | Loading JAR files for task Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) [DEPLOYING].
[2023-06-19 15:26:37,788]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3239 -org.apache.flink.runtime.state.StateBackendLoader.loadFromApplicationOrConfigOrDefaultInternal(StateBackendLoader.java:257).loadFromApplicationOrConfigOrDefaultInternal(257) | No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4e1fcd2f
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.runtime.state.StateBackendLoader -3240 -org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:315).fromApplicationOrConfigOrDefault(315) | State backend loader loads the state backend as HashMapStateBackend
[2023-06-19 15:26:37,789]-[INFO] -org.apache.flink.streaming.runtime.tasks.StreamTask -3240 -org.apache.flink.runtime.state.CheckpointStorageLoader.createJobManagerCheckpointStorage(CheckpointStorageLoader.java:274).createJobManagerCheckpointStorage(274) | Checkpoint storage is set to 'jobmanager'
[2023-06-19 15:26:37,793]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3244 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,795]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3246 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from DEPLOYING to INITIALIZING.
[2023-06-19 15:26:37,836]-[INFO] -org.apache.flink.runtime.taskmanager.Task -3287 -org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1080).transitionState(1080) | Source: Custom Source -> Map -> Sink: Unnamed (1/1)#0 (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.
[2023-06-19 15:26:37,837]-[INFO] -org.apache.flink.runtime.executiongraph.ExecutionGraph -3288 -org.apache.flink.runtime.executiongraph.Execution.transitionState(Execution.java:1416).transitionState(1416) | Source: Custom Source -> Map -> Sink: Unnamed (1/1) (965035c5eef2b8f28ffcfc309b92e203) switched from INITIALIZING to RUNNING.

這里的日志我截取了最后的部分,可以看到?jīng)]有任何報(bào)錯(cuò)的,我們?cè)诳匆幌律傻?code>CSV文件
flink-csv,FLink,flink,java,大數(shù)據(jù)
這里我們?cè)賹⑽募蜷_,看一下有沒有數(shù)據(jù)
flink-csv,FLink,flink,java,大數(shù)據(jù)
通過圖片可以看到這個(gè)文件中是沒有任何數(shù)據(jù)的.
這里我先說一下原因,然后再結(jié)合源碼看一下,沒有數(shù)據(jù)的原因是數(shù)據(jù)在內(nèi)存中還沒有達(dá)到4k的緩存,沒有到這個(gè)數(shù)據(jù)量就不會(huì)將數(shù)據(jù)刷新到磁盤上,代碼中我們加入了睡眠時(shí)間Thread.sleep(1000)就是為了看到這個(gè)效果,接下來我們就結(jié)合源碼看一下.writeAsCsv這個(gè)方法的緩存刷新是不是4k,我們先看一下.writeAsCsv的內(nèi)容,點(diǎn)擊去源碼后我們先找到下面這段代碼

    @Deprecated
    @PublicEvolving
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(
            String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) {
        Preconditions.checkArgument(
                getType().isTupleType(),
                "The writeAsCsv() method can only be used on data streams of tuples.");

        CsvOutputFormat<X> of = new CsvOutputFormat<>(new Path(path), rowDelimiter, fieldDelimiter);// 著重看這里,我們?cè)诳匆幌翪svOutputFormat里面的內(nèi)容

        if (writeMode != null) {
            of.setWriteMode(writeMode);
        }

        return writeUsingOutputFormat((OutputFormat<T>) of);
    }

這里我們?cè)邳c(diǎn)擊去看CsvOutputFormat這個(gè)輸出,找到如下內(nèi)容

 @Override
    public void writeRecord(T element) throws IOException {
        int numFields = element.getArity();

        for (int i = 0; i < numFields; i++) {
            Object v = element.getField(i);
            if (v != null) {
                if (i != 0) {
                    this.wrt.write(this.fieldDelimiter);
                }

                if (quoteStrings) {
                    if (v instanceof String || v instanceof StringValue) {
                        this.wrt.write('"'); // 我們要注意到wrt這個(gè)變量
                        this.wrt.write(v.toString());
                        this.wrt.write('"');
                    } else {
                        this.wrt.write(v.toString());
                    }
                } else {
                    this.wrt.write(v.toString());
                }
            } else {
                if (this.allowNullValues) {
                    if (i != 0) {
                        this.wrt.write(this.fieldDelimiter);
                    }
                } else {
                    throw new RuntimeException(
                            "Cannot write tuple with <null> value at position: " + i);
                }
            }
        }

        // add the record delimiter
        this.wrt.write(this.recordDelimiter);
    }

這里我們先看一下writeRecord(T element)這個(gè)方法,實(shí)際上在我們調(diào)用writeAsCsv的時(shí)候底層就是通過writeRecord方法將數(shù)據(jù)寫入csv文件,我們看上面代碼的時(shí)候要注意到this.wrt這個(gè)變量,通過wrt我們就可以找到,對(duì)數(shù)據(jù)刷新到磁盤定義的數(shù)據(jù)量的大小,看一下對(duì)wrt的定義,源碼內(nèi)容如下

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        super.open(taskNumber, numTasks);
        this.wrt =
                this.charsetName == null
                        ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) // 看一下這里
                        : new OutputStreamWriter(
                                new BufferedOutputStream(this.stream, 4096), this.charsetName); // 還有這里
    }

通過上面的源碼我們可以看到BufferedOutputStream的緩沖流定義死了為4096,也就是4k大小,這個(gè)參數(shù)是寫死的,我們改變不了,所以在使用writeAsCsv這個(gè)方法時(shí),代碼沒有報(bào)錯(cuò),并且文件中也沒有數(shù)據(jù)時(shí)先不要慌,通過源碼先看看具體的實(shí)現(xiàn)邏輯,我們就可以很快定位到問題,如果代碼中我將Thread.sleep(1000)這行代碼刪除掉的話CSV文件中很快就會(huì)有數(shù)據(jù)的,代碼中我使用的自定義數(shù)據(jù)源,并且每條數(shù)據(jù)其實(shí)很小,還有睡眠1秒的限制,所以導(dǎo)致很久CSV文件中都沒有數(shù)據(jù)生成.
文章內(nèi)容寫到現(xiàn)在也過了很久了,數(shù)據(jù)的大小也滿足4k的條件了,我們看一下文件內(nèi)容
flink-csv,FLink,flink,java,大數(shù)據(jù)
可以看到文件中已經(jīng)生成了數(shù)據(jù),我們?cè)诳匆幌挛募拇笮?br>flink-csv,FLink,flink,java,大數(shù)據(jù)
說到這里我想大家應(yīng)該都理解了,雖然說了這么多關(guān)于writeAsCsv這個(gè)方法的內(nèi)容,但是不建議大家使用這個(gè)方法畢竟屬于過時(shí)的方法,用起來弊端也比較大.文章來源地址http://www.zghlxwxcb.cn/news/detail-566014.html

到了這里,關(guān)于Flink將數(shù)據(jù)寫入CSV文件后文件中沒有數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【已解決】MATLAB寫入csv文件

    【已解決】MATLAB寫入csv文件

    在使用MATLAB的時(shí)候,經(jīng)常需要將數(shù)據(jù)以csv格式保存。接下來就看看如何將MATLAB中的數(shù)據(jù)保存到csv文件中 首先來看看csv格式。csv格式是用逗號(hào)分隔數(shù)據(jù)的一種文件。一行之間的數(shù)據(jù)用逗號(hào)分隔,行與行之間用n分隔。 用MATLAB將數(shù)據(jù)寫入csv文件時(shí),首先用fopen創(chuàng)建一個(gè)有寫入權(quán)限

    2024年02月11日
    瀏覽(24)
  • Python操作寫入/讀取csv文件

    Python操作寫入/讀取csv文件

    網(wǎng)絡(luò)工程師Python數(shù)據(jù)存儲(chǔ)(第1節(jié),CSV文件) 網(wǎng)絡(luò)自動(dòng)化運(yùn)維演進(jìn)的一個(gè)方向大致過程:網(wǎng)絡(luò)工程師從關(guān)注配置制作腳本,完成后上設(shè)備刷配置,慢慢地演化為網(wǎng)絡(luò)工程師關(guān)注和確定設(shè)備配置的某些重要控制參數(shù),而把制作腳本任務(wù)交給Jinja2等去渲染生成,把下發(fā)腳本工作交

    2024年02月03日
    瀏覽(93)
  • Python——csv文件的寫入與讀取

    CSV文件是一種常見的數(shù)據(jù)格式,它以逗號(hào)分隔不同的字段,每行表示一個(gè)數(shù)據(jù)記錄。在Python中,我們可以使用csv模塊來讀取和寫入CSV文件。 在Python中,我們可以使用csv模塊的writer對(duì)象來寫入CSV文件。下面是一個(gè)例子: 在上面的例子中,我們首先創(chuàng)建了要寫入的數(shù)據(jù),它是一

    2024年02月06日
    瀏覽(102)
  • python讀取txt文件內(nèi)容,寫入csv文件中去。

    txt文件中的內(nèi)容大概是這樣的: 2.在圖3中,當(dāng)開關(guān)斷開時(shí),R1、R2_______(串聯(lián)/并聯(lián)),當(dāng)開關(guān)閉合時(shí), 被短路。開關(guān)由斷開轉(zhuǎn)為閉合時(shí),總電阻 ,總電流_______,通過R2的電流_______(變大/變小/不變)。 3.如圖3,當(dāng)開關(guān)閉合時(shí),R2兩端電壓為3V,若R2=10Ω,則電流為_______。斷開

    2023年04月08日
    瀏覽(96)
  • 【Python基礎(chǔ)】一文搞懂:Python 中 csv 文件的寫入與讀取

    【Python基礎(chǔ)】一文搞懂:Python 中 csv 文件的寫入與讀取

    在數(shù)據(jù)處理和數(shù)據(jù)分析領(lǐng)域,CSV (逗號(hào)分隔值) 文件是一種常見的文件格式,用于存儲(chǔ)表格數(shù)據(jù)。Python 通過內(nèi)置的 csv 模塊提供了對(duì) CSV 文件的讀寫支持,使得處理這種類型的文件變得簡(jiǎn)單高效。本文將詳細(xì)介紹如何在 Python 中進(jìn)行 CSV 文件的讀取和寫入操作。 CSV 文件是一種簡(jiǎn)

    2024年04月25日
    瀏覽(30)
  • Flink之FileSink將數(shù)據(jù)寫入parquet文件

    Flink之FileSink將數(shù)據(jù)寫入parquet文件

    在使用FileSink將數(shù)據(jù)寫入列式存儲(chǔ)文件中時(shí)必須使用 forBulkFormat ,列式存儲(chǔ)文件如 ORCFile 、 ParquetFile ,這里就以 ParquetFile 為例結(jié)合代碼進(jìn)行說明. 在Flink 1.15.3 中是通過構(gòu)造 ParquetWriterFactory 然后調(diào)用 forBulkFormat 方法將構(gòu)造好的 ParquetWriterFactory 傳入,這里先講一下構(gòu)造 ParquetWriterF

    2024年02月03日
    瀏覽(19)
  • Python日常用法—將列表信息寫入到csv文件、列表中的元素直接更改
  • flink:通過table api把文件中讀取的數(shù)據(jù)寫入MySQL

    當(dāng)寫入數(shù)據(jù)到外部數(shù)據(jù)庫時(shí),F(xiàn)link 會(huì)使用 DDL 中定義的主鍵。如果定義了主鍵,則連接器將以 upsert 模式工作,否則連接器將以 append 模式工作 文件info.txt

    2024年03月15日
    瀏覽(18)
  • python如何寫入csv

    python如何寫入csv

    在使用python對(duì)文件操作的過程中,你肯定碰到過對(duì)csv文件的操作,下面就python對(duì)csv文件的操作進(jìn)行詳述。 CSV(Comma-Separated Values)逗號(hào)分隔符,也就是每條記錄中的值與值之間是用分號(hào)分隔的。 打開CSV文件并寫入一行數(shù)據(jù) 這里的操作是實(shí)現(xiàn)csv文件的打開以及寫入一行數(shù)據(jù),首

    2024年04月14日
    瀏覽(48)
  • Python寫入CSV出現(xiàn)空行解決方法

    最近在用Python創(chuàng)建寫入csv文件,也就在無形中踩到一些坑,也因此記錄下來,作為糾錯(cuò),也希望幫到大家。 前提:使用csv存儲(chǔ)多維數(shù)組元素,發(fā)現(xiàn)寫入后,使用Excel打開該csv文件會(huì)出現(xiàn)空行,使用文件方式讀取該csv文件輸出會(huì)出現(xiàn)“n\\\"。 解決方法:在csv文件生成時(shí),添加參數(shù)

    2024年02月12日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包