1 需求分析
在Java api中,使用flink本地模式,消費kafka主題,并直接將數(shù)據(jù)存入hdfs中。
flink版本1.13
kafka版本0.8
hadoop版本3.1.4
2 實驗過程
2.1 啟動服務程序
為了完成 Flink 從 Kafka 消費數(shù)據(jù)并實時寫入 HDFS 的需求,通常需要啟動以下組件:
[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager
確保 Zookeeper 在運行,因為 Flink 的 Kafka Consumer 需要依賴 Zookeeper。
確保 Kafka Server 在運行,因為 Flink 的 Kafka Consumer 需要連接到 Kafka Broker。
啟動 Flink 的 JobManager 和 TaskManager,這是執(zhí)行 Flink 任務的核心組件。
確保這些組件都在運行,以便 Flink 作業(yè)能夠正常消費 Kafka 中的數(shù)據(jù)并將其寫入 HDFS。
- 具體的啟動命令在此不再贅述。
2.2 啟動kafka生產(chǎn)
- 當前kafka沒有在守護進程后臺運行;
- 創(chuàng)建主題,啟動該主題的生產(chǎn)者,在kafka的bin目錄下執(zhí)行;
- 此時可以生產(chǎn)數(shù)據(jù),從該窗口鍵入任意數(shù)據(jù)進行發(fā)送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1
3 Java API 開發(fā)
3.1 依賴
此為項目的所有依賴,包括flink、spark、hbase、ck等,實際本需求無需全部依賴,均可在阿里云或者maven開源鏡像站下載。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.6</flink.version>
<hbase.version>2.4.0</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.6</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.20</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.8</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<!--上傳的本地jar的位置-->
<fromFile>target/${project.build.finalName}.jar</fromFile>
<!--遠程拷貝的地址-->
<url>scp://root:root@hadoop10:/opt/app</url>
</configuration>
</plugin>
</plugins>
</build>
</project>
- 依賴參考
3.2 代碼部分
- 請注意kafka和hdfs的部分需要配置服務器地址,域名映射。
- 此代碼的功能是消費
topic1
主題,將數(shù)據(jù)直接寫入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Test9_kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop10:9092");
properties.setProperty("group.id", "test");
// 使用FlinkKafkaConsumer作為數(shù)據(jù)源
DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));
String outputPath = "hdfs://hadoop10:8020/out240102";
// 使用StreamingFileSink將數(shù)據(jù)寫入HDFS
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.build();
// 添加Sink,將Kafka數(shù)據(jù)直接寫入HDFS
ds1.addSink(sink);
ds1.print();
env.execute("Flink Kafka HDFS");
}
}
4 實驗驗證
STEP1
運行idea代碼,程序開始執(zhí)行,控制臺除了日志外為空。下圖是已經(jīng)接收到生產(chǎn)者的數(shù)據(jù)后,消費在控制臺的截圖。
STEP2
啟動生產(chǎn)者,將數(shù)據(jù)寫入,數(shù)據(jù)無格式限制,隨意填寫。此時發(fā)送的數(shù)據(jù),是可以在STEP1中的控制臺中看到屏幕打印結果的。
STEP3
在HDFS中查看對應的目錄,可以看到數(shù)據(jù)已經(jīng)寫入完成。
我這里生成了多個inprogress文件,是因為我測試了多次,斷碼運行了多次。ide打印在屏幕后,到hdfs落盤寫入,中間有一定時間,需要等待,在HDFS中刷新數(shù)據(jù),可以看到文件大小從0到被寫入數(shù)據(jù)的過程。文章來源:http://www.zghlxwxcb.cn/news/detail-821474.html
5 時間窗口
- 使用另一種思路實現(xiàn),以時間窗口的形式,將數(shù)據(jù)實時寫入HDFS,實驗方法同上。截圖為發(fā)送數(shù)據(jù)消費,并且在HDFS中查看到數(shù)據(jù)。
文章來源地址http://www.zghlxwxcb.cn/news/detail-821474.html
package day2;
import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Test9_kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop10:9092");
properties.setProperty("group.id", "test");
// 使用FlinkKafkaConsumer作為數(shù)據(jù)源
DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));
String outputPath = "hdfs://hadoop10:8020/out240102";
// 使用StreamingFileSink將數(shù)據(jù)寫入HDFS
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.build();
// 在一個時間窗口內(nèi)將數(shù)據(jù)寫入HDFS
ds1.process(new CustomProcessFunction()) // 使用自定義 ProcessFunction
.addSink(sink);
// 執(zhí)行程序
env.execute("Flink Kafka HDFS");
}
}
package day2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class CustomProcessFunction extends ProcessFunction<String, String> {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// 在這里可以添加具體的邏輯,例如將數(shù)據(jù)寫入HDFS
System.out.println(value); // 打印結果到屏幕
out.collect(value);
}
}
到了這里,關于實戰(zhàn)Flink Java api消費kafka實時數(shù)據(jù)落盤HDFS的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!