国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS

這篇具有很好參考價值的文章主要介紹了實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1 需求分析

在Java api中,使用flink本地模式,消費kafka主題,并直接將數(shù)據(jù)存入hdfs中。

flink版本1.13

kafka版本0.8

hadoop版本3.1.4

2 實驗過程

2.1 啟動服務程序

為了完成 Flink 從 Kafka 消費數(shù)據(jù)并實時寫入 HDFS 的需求,通常需要啟動以下組件:

[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager

確保 Zookeeper 在運行,因為 Flink 的 Kafka Consumer 需要依賴 Zookeeper。

確保 Kafka Server 在運行,因為 Flink 的 Kafka Consumer 需要連接到 Kafka Broker。

啟動 Flink 的 JobManager 和 TaskManager,這是執(zhí)行 Flink 任務的核心組件。

確保這些組件都在運行,以便 Flink 作業(yè)能夠正常消費 Kafka 中的數(shù)據(jù)并將其寫入 HDFS。

  • 具體的啟動命令在此不再贅述。

2.2 啟動kafka生產(chǎn)

  • 當前kafka沒有在守護進程后臺運行;
  • 創(chuàng)建主題,啟動該主題的生產(chǎn)者,在kafka的bin目錄下執(zhí)行;
  • 此時可以生產(chǎn)數(shù)據(jù),從該窗口鍵入任意數(shù)據(jù)進行發(fā)送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1

kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1

flink 消費kafka java,flink,java,kafka

3 Java API 開發(fā)

3.1 依賴

此為項目的所有依賴,包括flink、spark、hbase、ck等,實際本需求無需全部依賴,均可在阿里云或者maven開源鏡像站下載。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.6</flink.version>
        <hbase.version>2.4.0</hbase.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
           <!-- <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

       <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.20</version>
        </dependency>
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.8</version>
            </extension>
        </extensions>

        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>wagon-maven-plugin</artifactId>
                <version>1.0</version>
                <configuration>
                    <!--上傳的本地jar的位置-->
                    <fromFile>target/${project.build.finalName}.jar</fromFile>
                    <!--遠程拷貝的地址-->
                    <url>scp://root:root@hadoop10:/opt/app</url>
                </configuration>
            </plugin>
        </plugins>

    </build>



</project>

  • 依賴參考
    flink 消費kafka java,flink,java,kafka

3.2 代碼部分

  • 請注意kafka和hdfs的部分需要配置服務器地址,域名映射。
  • 此代碼的功能是消費topic1主題,將數(shù)據(jù)直接寫入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Test9_kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "test");

        // 使用FlinkKafkaConsumer作為數(shù)據(jù)源
        DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

        String outputPath = "hdfs://hadoop10:8020/out240102";

        // 使用StreamingFileSink將數(shù)據(jù)寫入HDFS
        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .build();

        // 添加Sink,將Kafka數(shù)據(jù)直接寫入HDFS
        ds1.addSink(sink);
        ds1.print();
        env.execute("Flink Kafka HDFS");
    }
}

4 實驗驗證

STEP1

運行idea代碼,程序開始執(zhí)行,控制臺除了日志外為空。下圖是已經(jīng)接收到生產(chǎn)者的數(shù)據(jù)后,消費在控制臺的截圖。

flink 消費kafka java,flink,java,kafka

STEP2

啟動生產(chǎn)者,將數(shù)據(jù)寫入,數(shù)據(jù)無格式限制,隨意填寫。此時發(fā)送的數(shù)據(jù),是可以在STEP1中的控制臺中看到屏幕打印結果的。
flink 消費kafka java,flink,java,kafka

STEP3

在HDFS中查看對應的目錄,可以看到數(shù)據(jù)已經(jīng)寫入完成。
我這里生成了多個inprogress文件,是因為我測試了多次,斷碼運行了多次。ide打印在屏幕后,到hdfs落盤寫入,中間有一定時間,需要等待,在HDFS中刷新數(shù)據(jù),可以看到文件大小從0到被寫入數(shù)據(jù)的過程。
flink 消費kafka java,flink,java,kafka

5 時間窗口

  • 使用另一種思路實現(xiàn),以時間窗口的形式,將數(shù)據(jù)實時寫入HDFS,實驗方法同上。截圖為發(fā)送數(shù)據(jù)消費,并且在HDFS中查看到數(shù)據(jù)。
    flink 消費kafka java,flink,java,kafka

flink 消費kafka java,flink,java,kafka文章來源地址http://www.zghlxwxcb.cn/news/detail-821474.html

package day2;

import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Test9_kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "test");

        // 使用FlinkKafkaConsumer作為數(shù)據(jù)源
        DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

        String outputPath = "hdfs://hadoop10:8020/out240102";

        // 使用StreamingFileSink將數(shù)據(jù)寫入HDFS
        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .build();

        // 在一個時間窗口內(nèi)將數(shù)據(jù)寫入HDFS
        ds1.process(new CustomProcessFunction())  // 使用自定義 ProcessFunction
                .addSink(sink);

        // 執(zhí)行程序
        env.execute("Flink Kafka HDFS");
    }
}
package day2;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class CustomProcessFunction extends ProcessFunction<String, String> {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        // 在這里可以添加具體的邏輯,例如將數(shù)據(jù)寫入HDFS
        System.out.println(value);  // 打印結果到屏幕
        out.collect(value);
    }
}

到了這里,關于實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Flink SQL和Table API實現(xiàn)消費kafka寫入mysql

    Flink SQL和Table API實現(xiàn)消費kafka寫入mysql

    1、構建 table環(huán)境 2、構建source kafka 方式一:API 方式二:Flink SQL 3、構建sink mysql? 4、寫入將source表寫入sink表 方式一:API 方式二:Flink SQL 5、手動執(zhí)行 6、測試 (1)連接kafka生產(chǎn)者 (2)造數(shù)據(jù) (3)mysql查看入庫情況

    2024年01月16日
    瀏覽(24)
  • JAVA實時獲取kafka各個主題下分區(qū)消息的消費情況

    通過指定 主題 和 消費者組 調用方法,實時查看主題下分區(qū)消息的消費情況(消息總數(shù)量、消費消息數(shù)量、未消費的消息數(shù)量)。

    2024年02月13日
    瀏覽(26)
  • flink正常消費kafka數(shù)據(jù),flink沒有做checkpoint,kafka位點沒有提交

    1、背景 flink消費kafka數(shù)據(jù),多并發(fā),實現(xiàn)雙流join 2、現(xiàn)象 (1)flink任務消費kafka數(shù)據(jù),其中數(shù)據(jù)正常消費,kafka顯示消息堆積,位點沒有提交,并且flink任務沒有做checkpoint (2)其中一個流的subtask顯示finished (3)無背壓 3、問題原因 (1)其中一個topic分區(qū)為1 (2)配置的并行

    2024年02月13日
    瀏覽(22)
  • 大數(shù)據(jù)-玩轉數(shù)據(jù)-FLINK-從kafka消費數(shù)據(jù)

    大數(shù)據(jù)-玩轉數(shù)據(jù)-Kafka安裝 運行本段代碼,等待kafka產(chǎn)生數(shù)據(jù)進行消費。

    2024年02月14日
    瀏覽(23)
  • 輕松通關Flink第24講:Flink 消費 Kafka 數(shù)據(jù)業(yè)務開發(fā)

    在上一課時中我們提過在實時計算的場景下,絕大多數(shù)的數(shù)據(jù)源都是消息系統(tǒng),而 Kafka 從眾多的消息中間件中脫穎而出,主要是因為 高吞吐 、 低延遲 的特點;同時也講了 Flink 作為生產(chǎn)者像 Kafka 寫入數(shù)據(jù)的方式和代碼實現(xiàn)。這一課時我們將從以下幾個方面介紹 Flink 消費

    2024年02月08日
    瀏覽(25)
  • Flink使用 KafkaSource消費 Kafka中的數(shù)據(jù)

    目前,很多 flink相關的書籍和網(wǎng)上的文章講解如何對接 kafka時都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標記為 deprecated(不推薦),如下: 新版本的 flink應該使用 KafkaSource來消費 kafka中的數(shù)據(jù),詳細代碼如下: 開發(fā)者在工作中應該盡量避

    2024年02月15日
    瀏覽(21)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費 Kafka 數(shù)據(jù)寫入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關配置都是通過 Mysql 讀取的(生產(chǎn)環(huán)境中沒有直接寫死的,都是通過配置文件動態(tài)配置),大家實際測試過程中可以將相關配置信息寫死。 2、此程序中 Kafka 涉及到了 Kerberos 認證操作

    2024年02月03日
    瀏覽(21)
  • flink如何初始化kafka數(shù)據(jù)源的消費偏移

    我們知道在日常非flink場景中消費kafka主題時,我們只要指定了消費者組,下次程序重新消費時是可以從上次消費停止時的消費偏移開始繼續(xù)消費的,這得益于kafka的_offset_主題保存的關于消費者組和topic偏移位置的具體偏移信息,那么flink應用中重啟flink應用時,flink是從topic的什

    2024年02月16日
    瀏覽(30)
  • Idea本地跑flink任務時,總是重復消費kafka的數(shù)據(jù)(kafka->mysql)

    Idea本地跑flink任務時,總是重復消費kafka的數(shù)據(jù)(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中執(zhí)行任務時,沒法看到JobManager的錯誤,以至于我以為是什么特殊的原因導致任務總是反復消費。在close方法中,增加日志,發(fā)現(xiàn)jdbc連接被關閉了。 重新消費,jdbc連接又啟動了。 注意,在Flink的函數(shù)中,open和close方法

    2024年02月07日
    瀏覽(25)
  • 【項目實戰(zhàn)】Java 開發(fā) Kafka 消費者

    【項目實戰(zhàn)】Java 開發(fā) Kafka 消費者

    ?? 博主介紹 : 博主從事應用安全和大數(shù)據(jù)領域,有8年研發(fā)經(jīng)驗,5年面試官經(jīng)驗,Java技術專家,WEB架構師,阿里云專家博主,華為云云享專家,51CTO TOP紅人 Java知識圖譜點擊鏈接: 體系化學習Java(Java面試專題) ???? 感興趣的同學可以收藏關注下 , 不然下次找不到喲

    2024年02月16日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包