国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink學習之旅:(三)Flink源算子(數(shù)據(jù)源)

這篇具有很好參考價值的文章主要介紹了Flink學習之旅:(三)Flink源算子(數(shù)據(jù)源)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1.Flink數(shù)據(jù)源

? ? ? ? Flink可以從各種數(shù)據(jù)源獲取數(shù)據(jù),然后構建DataStream 進行處理轉換。source就是整個數(shù)據(jù)處理程序的輸入端。

數(shù)據(jù)集合
數(shù)據(jù)文件
Socket數(shù)據(jù)
kafka數(shù)據(jù)
自定義Source

2.案例

2.1.從集合中獲取數(shù)據(jù)

? ? ? ? 創(chuàng)建 FlinkSource_List 類,再創(chuàng)建個 Student 類(姓名、年齡、性別三個屬性就行,反正測試用)

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-18 16:13
 */
public class FlinkSource_List {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        ArrayList<Student> clicks = new ArrayList<>();
        clicks.add(new Student("Mary",25,1));
        clicks.add(new Student("Bob",26,2));
        DataStream<Student> stream = env.fromCollection(clicks);
        stream.print();
        env.execute();
    }
}

運行結果:

Student{name='Mary', age=25, sex=1}
Student{name='Bob', age=26, sex=2}

2.2.從文件中讀取數(shù)據(jù)

文件數(shù)據(jù):

spark
hello world kafka spark
hadoop spark

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-18 16:31
 */
public class FlinkSource_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> stream = env.readTextFile("input/words.txt");
        stream.print();
        env.execute();
    }
}

運行結果:(沒做任何處理)

spark
hello world kafka spark
hadoop spark

2.3.從Socket中讀取數(shù)據(jù)

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-18 17:41
 */
public class FlinkSource_Socket {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 讀取文本流
        DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",
                7777);
        lineDSS.print();
        env.execute();
    }
}

運行結果:

服務器上執(zhí)行:

 nc -lk 7777

瘋狂輸出

Flink學習之旅:(三)Flink源算子(數(shù)據(jù)源),大數(shù)據(jù)學習之路,flink,學習,大數(shù)據(jù)

控制臺打印結果?

6> hello
7> world

2.4.從Kafka中讀取數(shù)據(jù)

pom.xml 添加Kafka連接依賴

      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
      </dependency>
package com.qiyu;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-19 10:01
 */
public class FlinkSource_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> stream = env.addSource(
                new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties
        ));
        stream.print("Kafka");
        env.execute();
    }
}

啟動 zk 和kafka

創(chuàng)建topic

bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 1 --partitions 1 --topic clicks

生產者、消費者命令

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092  --topic clicks
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092  --topic clicks --from-beginning

啟動生產者命令后瘋狂輸入?

運行java類,運行結果:和生產者輸入的是一樣的

Kafka> flinks
Kafka> hadoop
Kafka> hello
Kafka> nihaop

2.5.從自定義Source中讀取數(shù)據(jù)

? ? ? ? 大多數(shù)情況下,前面幾個數(shù)據(jù)源已經(jīng)滿足需求了。但是遇到特殊情況我們需要自定義的數(shù)據(jù)源。實現(xiàn)方式如下:

? ? ? ? 1.編輯自定義源Source

package com.qiyu;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-19 10:37
 */

/***
 * 主要實現(xiàn)2個方法 run() 和 cancel()
 */
public class FlinkSource_Custom implements SourceFunction<Student> {


    // 聲明一個布爾變量,作為控制數(shù)據(jù)生成的標識位
    private Boolean running = true;

    @Override
    public void run(SourceContext<Student> sourceContext) throws Exception {
        Random random = new Random(); // 在指定的數(shù)據(jù)集中隨機選取數(shù)據(jù)
        String[] name = {"Mary", "Alice", "Bob", "Cary"};
        int[] sex = {1,2};
        int age = 0;
        while (running) {
            sourceContext.collect(new Student(
                    name[random.nextInt(name.length)],
                    sex[random.nextInt(sex.length)],
                    random.nextInt(100)
            ));
            // 隔 1 秒生成一個點擊事件,方便觀測
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

? ? ? ? 2.編寫主程序

package com.qiyu;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author MR.Liu
 * @version 1.0
 * @data 2023-10-19 10:46
 */
public class FlinkSource_Custom2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//有了自定義的 source function,調用 addSource 方法
        DataStreamSource<Student> stream = env.addSource(new FlinkSource_Custom());
        stream.print("SourceCustom");
        env.execute();
    }
}

?運行主程序,運行結果:

SourceCustom> Student{name='Mary', age=1, sex=46}
SourceCustom> Student{name='Cary', age=2, sex=52}
SourceCustom> Student{name='Bob', age=1, sex=14}
SourceCustom> Student{name='Alice', age=1, sex=84}
SourceCustom> Student{name='Alice', age=2, sex=82}
SourceCustom> Student{name='Cary', age=1, sex=28}

.............文章來源地址http://www.zghlxwxcb.cn/news/detail-739375.html

到了這里,關于Flink學習之旅:(三)Flink源算子(數(shù)據(jù)源)的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包