準備
你需要將這兩個依賴添加到 pom.xml 中
mysql
mysql-connector-java
8.0.0
讀取 kafka 數(shù)據(jù)
這里我依舊用的以前的 student 類,自己本地起了 kafka 然后造一些測試數(shù)據(jù),這里我們測試發(fā)送一條數(shù)據(jù)則 sleep 10s,意味著往 kafka 中一分鐘發(fā) 6 條數(shù)據(jù)。文章來源:http://www.zghlxwxcb.cn/news/detail-770352.html
package com.zhisheng.connectors.mysql.utils;
import com.zhisheng.common.utils.GsonUtil;
import com.zhisheng.connectors.mysql.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Desc: 往kafka中寫數(shù)據(jù),可以使用這個main函數(shù)進行測試
*/
public class KafkaUtil {
public static final String broker_list = "localhost:9092";
public static final String topic = "student"; //kafka topic 需要和 flink 程序用同一個 topic
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers"文章來源地址http://www.zghlxwxcb.cn/news/detail-770352.html
到了這里,關(guān)于【Flink】 Flink實時讀取mysql數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!