国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程

這篇具有很好參考價(jià)值的文章主要介紹了記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、前言
總體思路:source -->transform -->sink ,即從source獲取相應(yīng)的數(shù)據(jù)來(lái)源,然后進(jìn)行數(shù)據(jù)轉(zhuǎn)換,將數(shù)據(jù)從比較亂的格式,轉(zhuǎn)換成我們需要的格式,轉(zhuǎn)換處理后,然后進(jìn)行sink功能,也就是將數(shù)據(jù)寫入的相應(yīng)的數(shù)據(jù)庫(kù)DB中或者寫入Hive的HDFS文件存儲(chǔ)。
思路:
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql

pom部分放到最后面。

二、方案及代碼實(shí)現(xiàn)

2.1 Source部分
Source部分構(gòu)建一個(gè)web對(duì)象用于保存數(shù)據(jù)等操作,代碼如下:

package com.lzl.flink;

import java.util.Date;

/**
 * @author lzl
 * @create 2024-01-18 12:19
 * @name pojo
 */
public class Web {
    private String uuid;
    private String ip;
    private String area;
    private String web;
    private String operate;
    private Date createDate;

    public String getArea() {
        return area;
    }

    public String getIp() {
        return ip;
    }

    public String getOperate() {
        return operate;
    }

    public String getUuid() {
        return uuid;
    }

    public String getWeb() {
        return web;
    }

    public Date getCreateDate() {
        return createDate;
    }

    public void setArea(String area) {
        this.area = area;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public void setOperate(String operate) {
        this.operate = operate;
    }

    public void setUuid(String uuid) {
        this.uuid = uuid;
    }

    public void setWeb(String web) {
        this.web = web;
    }

    public void setCreateDate(Date createDate) {
        this.createDate = createDate;
    }
}

將生成的數(shù)據(jù)轉(zhuǎn)化為JSON格式,測(cè)試如下:

public static void webDataProducer() throws Exception{
        //構(gòu)建web對(duì)象,在ip為10.117后面加兩個(gè)隨機(jī)數(shù)
        int randomInt1 = RandomUtils.nextInt(1,255);
        int randomInt2 = RandomUtils.nextInt(1,999);
        int randomInt3 = RandomUtils.nextInt(1,99999);
        List<String> areas = Arrays.asList("深圳", "廣州", "上海", "北京", "武漢", "合肥", "杭州", "南京");
        List<String> webs = Arrays.asList("www.taobao.com","www.baidu.com","www.jd.com","www.weibo.com","www.qq.com","www.weixin.com","www.360.com","www.lzl.com","www.xiaomi.com");
        List<String> operates = Arrays.asList("register","view","login","buy","click","comment","jump","care","collect");

        Web web = new Web();  //實(shí)例化一個(gè)web對(duì)象,并向?qū)ο笾蟹湃霐?shù)據(jù)
        web.setUuid("uid_" + randomInt3);
        web.setIp("10.110." + randomInt1 +"." + randomInt2);
        web.setArea(getRandomElement(areas));
        web.setWeb(getRandomElement(webs));
        web.setOperate(getRandomElement(operates));
        web.setCreateDate(new Date());

        // 轉(zhuǎn)換成JSON格式
        String webJson = JSON.toJSONString(web);
        System.out.println(webJson); //打印出來(lái)看看效果

    }
	//構(gòu)建一個(gè)從列表里面任意篩選一個(gè)元素的函數(shù)方法
    public static <T> T getRandomElement(List<T> list) {
        Collections.shuffle(list);
        return list.get(0);
    }

    public static void main(String[] args) {
        while (true) {
            try {
                // 每三秒寫一條數(shù)據(jù)
                TimeUnit.SECONDS.sleep(3);
                webDataProducer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

執(zhí)行測(cè)試結(jié)果如下:
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql
至此Source部分結(jié)束~~~~?。。。。?!

2.2 Transform_1部分

2.2.1 寫入kafka方法函數(shù):

package com.lzl.flink;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @author lzl
 * @create 2024-01-18 12:18
 * @name KafkaWriter
 */
public class KafkaWriter {

    //kafka集群列表
    public static final String BROKER_LIST = "cdh39:9092,cdh40:9092,cdh41:9092";
    //kafka的topic
    public static final String TOPIC_WEB = "web";
    //kafka序列化的方式,采用字符串的形式
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    //value的序列化方式
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";

    public static void writeToKafka() throws Exception {
        Properties props = new Properties(); //實(shí)例化一個(gè)Properties
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);

        // 構(gòu)建Kafka生產(chǎn)者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 將web生成的數(shù)據(jù)發(fā)送給kafka的記錄
        String webDataJson = webDataProducer();
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC_WEB,null,null,webDataJson);
        // 發(fā)送到緩存
        producer.send(record);
        System.out.println("向kafka發(fā)送數(shù)據(jù):" + webDataJson);
        producer.flush();
    }
   public static void main(String[] args) {
        while (true) {
            try {
                // 每三秒寫一條數(shù)據(jù)
                TimeUnit.SECONDS.sleep(3);
                writeToKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

2.2.2 建立 web的topic:
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql
啟動(dòng)程序測(cè)試:
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql
2.2.3 消費(fèi)kafka看看是否有數(shù)據(jù)?

[root@cdh39 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh39:9092 --from-beginning --topic web
{"area":"合肥","createDate":1705571020461,"ip":"10.110.104.676","operate":"comment","uuid":"uid_29661","web":"www.qq.com"}
{"area":"北京","createDate":1705571024048,"ip":"10.110.49.479","operate":"jump","uuid":"uid_77119","web":"www.weibo.com"}
{"area":"合肥","createDate":1705571027106,"ip":"10.110.232.960","operate":"click","uuid":"uid_99704","web":"www.taobao.com"}
{"area":"上海","createDate":1705571030140,"ip":"10.110.12.252","operate":"buy","uuid":"uid_99850","web":"www.jd.com"}
{"area":"合肥","createDate":1705571033228,"ip":"10.110.75.328","operate":"care","uuid":"uid_33135","web":"www.qq.com"}
{"area":"上海","createDate":1705571036267,"ip":"10.110.4.862","operate":"collect","uuid":"uid_37279","web":"www.taobao.com"}
{"area":"北京","createDate":1705571039361,"ip":"10.110.139.814","operate":"register","uuid":"uid_33016","web":"www.baidu.com"}
{"area":"武漢","createDate":1705571042422,"ip":"10.110.159.143","operate":"collect","uuid":"uid_26315","web":"www.lzl.com"}
{"area":"南京","createDate":1705571045495,"ip":"10.110.81.685","operate":"login","uuid":"uid_38712","web":"www.baidu.com"}
{"area":"南京","createDate":1705571048545,"ip":"10.110.228.267","operate":"comment","uuid":"uid_23297","web":"www.weibo.com"}
{"area":"武漢","createDate":1705571051623,"ip":"10.110.102.247","operate":"collect","uuid":"uid_77340","web":"www.lzl.com"}
{"area":"武漢","createDate":1705571054687,"ip":"10.110.184.832","operate":"comment","uuid":"uid_35230","web":"www.360.com"}
{"area":"武漢","createDate":1705571057760,"ip":"10.110.90.361","operate":"buy","uuid":"uid_52082","web":"www.lzl.com"}
{"area":"北京","createDate":1705571060825,"ip":"10.110.37.707","operate":"buy","uuid":"uid_45343","web":"www.weixin.com"}
{"area":"上海","createDate":1705571063909,"ip":"10.110.178.901","operate":"care","uuid":"uid_51015","web":"www.baidu.com"}
{"area":"杭州","createDate":1705571066945,"ip":"10.110.153.758","operate":"collect","uuid":"uid_46772","web":"www.xiaomi.com"}
{"area":"合肥","createDate":1705571069980,"ip":"10.110.177.755","operate":"comment","uuid":"uid_78442","web":"www.taobao.com"}
{"area":"廣州","createDate":1705571073020,"ip":"10.110.151.427","operate":"register","uuid":"uid_92174","web":"www.weixin.com"}
{"area":"上海","createDate":1705571076072,"ip":"10.110.217.622","operate":"jump","uuid":"uid_86059","web":"www.xiaomi.com"}

至此,Transform_1部分結(jié)束~~~!?。。?/p>

2.3 Sink部分
創(chuàng)建一個(gè)MySQLSink,繼承RichSinkFunction類。重載里邊的open、invoke 、close方法,在執(zhí)行數(shù)據(jù)sink之前先執(zhí)行open方法,然后開(kāi)始調(diào)用invoke方法,調(diào)用完之后最后執(zhí)行close方法關(guān)閉資源。即在open里面創(chuàng)建數(shù)據(jù)庫(kù)連接,然后調(diào)用invoke執(zhí)行具體的數(shù)據(jù)庫(kù)寫入程序,完畢之后調(diào)用close關(guān)閉和釋放資源。這里要繼承flink的RichSinkFunction接口。代碼如下:

package com.lzl.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.logging.Logger;

/**
 * @author lzl
 * @create 2024-01-22 15:30
 * @name MySqlToPojoSink
 */
public class MySqlToPojoSink extends RichSinkFunction<Web> {

    private static final Logger log = Logger.getLogger(MySqlToPojoSink.class.getName());
    private static final long serialVersionUID = 1L;
    private Connection connection = null;
    private PreparedStatement ps = null;
    private String tableName = "web";
    
 	@Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        log.info("獲取數(shù)據(jù)庫(kù)連接");

        // 通過(guò)Druid獲取數(shù)據(jù)庫(kù)連接,準(zhǔn)備寫入數(shù)據(jù)庫(kù)
        connection = DbUtils.getConnection();

        // 插入數(shù)據(jù)庫(kù)的語(yǔ)句   因?yàn)槲覀兎庋b的pojo的類型為PojoType<com.lzl.flink.Web, fields = [area: String, createDate: Date, ip: String, operate: String, uuid: String, web: String]>
        String insertQuery = "INSERT INTO " +   tableName + "(time,ip,uid,area,web,operate) VALUES (?,?,?,?,?,?)" ;

        // 執(zhí)行插入語(yǔ)句
        ps = connection.prepareStatement(insertQuery);
    }

	 // 重新關(guān)閉方法。   關(guān)閉并釋放資源
    @Override
    public void close() throws Exception {
    super.close();
        if(connection != null) {
            connection.close();
        }
        if (ps != null ) {
            ps.close();
        }
    }

 	// 重寫invoke方法
    @Override
    public void invoke(Web value,Context context) throws Exception {
        //組裝數(shù)據(jù),執(zhí)行插入操作
        ps.setTimestamp(1, new Timestamp(value.getCreateDate().getTime()));
        ps.setString(2,value.getIp());
        ps.setString(3, value.getUuid());
        ps.setString(4, value.getArea());
        ps.setString(5, value.getWeb());
        ps.setString(6, value.getOperate());
        ps.addBatch();

        // 一次性寫入
        int[] count = ps.executeBatch();
        System.out.println("成功寫入MySQL數(shù)量:" + count.length);
    }
}

特別說(shuō)明:從kafka讀取到的內(nèi)容是String,里面包含JSON格式。本文是先將它封裝成Pojo對(duì)象,然后在Sink這里解析它的Value。(開(kāi)始是嘗試通過(guò)apply算子將它轉(zhuǎn)換為L(zhǎng)ist,但是失敗了(時(shí)間有限,后續(xù)再弄),最后是通過(guò)map算子)

至此,Sink部分結(jié)束~!

2.4 Transform_2部分。消費(fèi)kafka 數(shù)據(jù),添加Sink。

package com.lzl.flink;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
 * @author lzl
 * @create 2024-01-19 8:49
 * @name DataSourceFromKafka
 */
public class DataSourceFromKafka {
    public static void transformFromKafka() throws Exception {
        // 構(gòu)建流執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
		//kafka 配置
        Properties prop = new Properties();
        prop.put("bootstrap.servers", KafkaWriter.BROKER_LIST);
        prop.put("zookeeper.connect", "cdh39:2181");
        prop.put("group.id", KafkaWriter.TOPIC_WEB);
        prop.put("key.serializer", KafkaWriter.KEY_SERIALIZER);
        prop.put("value.serializer", KafkaWriter.VALUE_SERIALIZER);
        prop.put("auto.offset.reset", "earliest");

        // 建立流數(shù)據(jù)源
        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>(
                KafkaWriter.TOPIC_WEB,
                new SimpleStringSchema(),
                prop
        )).setParallelism(1); // 單線程打印,控制臺(tái)不亂序,不影響結(jié)果

        SingleOutputStreamOperator<Web> webStream = env.addSource(new FlinkKafkaConsumer<>(
                "web",
                new SimpleStringSchema(),
                prop
        )).setParallelism(1)
                .map(string-> JSON.parseObject(string,Web.class));

        webStream.addSink(new MySqlToPojoSink());
        env.execute();
    }
    
    public static void main(String[] args) throws Exception {
        while (true) {
            try {
                // 每1毫秒寫一條數(shù)據(jù)
                TimeUnit.MILLISECONDS.sleep(1);
                transformFromKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

如果要設(shè)置空值報(bào)錯(cuò)異常,或者排除空值可以:

package com.lzl.flink;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class DataSourceFromKafka {


    public static void transformFromKafka() throws Exception {
        // 構(gòu)建流執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //checkpoint設(shè)置
        //每隔10s進(jìn)行啟動(dòng)一個(gè)檢查點(diǎn)【設(shè)置checkpoint的周期】
        env.enableCheckpointing(10000);
        //設(shè)置模式為:exactly_one,僅一次語(yǔ)義
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //確保檢查點(diǎn)之間有1s的時(shí)間間隔【checkpoint最小間隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        //檢查點(diǎn)必須在10s之內(nèi)完成,或者被丟棄【checkpoint超時(shí)時(shí)間】
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        //同一時(shí)間只允許進(jìn)行一次檢查點(diǎn)
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //表示一旦Flink程序被cancel后,會(huì)保留checkpoint數(shù)據(jù),以便根據(jù)實(shí)際需要恢復(fù)到指定的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //設(shè)置statebackend,將檢查點(diǎn)保存在hdfs上面,默認(rèn)保存在內(nèi)存中。先保存到resources目錄下
        env.setStateBackend(new FsStateBackend("D:java//Flink1.17//src//main//resources"));


//         kafka 配置
        Properties prop = new Properties();
        prop.put("bootstrap.servers", KafkaWriter.BROKER_LIST);
        prop.put("zookeeper.connect", "cdh39:2181");
        prop.put("group.id", KafkaWriter.TOPIC_WEB);
        prop.put("key.serializer", KafkaWriter.KEY_SERIALIZER);
        prop.put("value.serializer", KafkaWriter.VALUE_SERIALIZER);
        prop.put("auto.offset.reset", "earliest")

        DataStreamSource<String> webStream = env.addSource(new FlinkKafkaConsumer<>(
                "web",
                new SimpleStringSchema(),
                prop
        )).setParallelism(1);

        //使用process算子 排除空值
        DataStream<Web> processData = webStream.process(new ProcessFunction<String, Web>() {
            @Override
            public void processElement(String s, Context context, Collector<Web> collector) throws Exception {
                try {
                    Web webs = JSON.parseObject(s, Web.class);
                    if (webs != null) {
                        collector.collect(webs);
                    }
                } catch (Exception e) {
                    System.out.println("有空值數(shù)據(jù)");
                }
            }
        });

        processData.addSink(new MySqlToPojoSink());
        env.execute();
    }

    public static void main(String[] args) throws Exception {
        while (true) {
            try {
                // 每1毫秒寫一條數(shù)據(jù)
                TimeUnit.MILLISECONDS.sleep(1);
                transformFromKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

至此,Transfrom結(jié)束~!

2.5 DB部分(這部分可以先做,或者放到前面,因?yàn)樾枰獪y(cè)試)
本次的DB演示采用常規(guī)的MySQL數(shù)據(jù)庫(kù)。采用Druid工具連接。
思路:創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接的工具,用于連接數(shù)據(jù)庫(kù)。使用Druid工具,然后放入具體的Driver,Url,數(shù)據(jù)庫(kù)用戶名和密碼,初始化連接數(shù),最大活動(dòng)連接數(shù),最小空閑連接數(shù)也就是數(shù)據(jù)庫(kù)連接池,創(chuàng)建好之后返回需要的連接。

package com.lzl.flink;

import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
/**
 * @author lzl
 * @create 2024-01-18 17:58
 * @name DbUtils
 */
public class DbUtils {
    private static DruidDataSource dataSource;

    public static Connection getConnection() throws Exception {
        dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://cdh129:3306/flink?useSSL=true");
        dataSource.setUsername("root");
        dataSource.setPassword("xxb@5196");
        // 設(shè)置初始化連接數(shù),最大連接數(shù),最小閑置數(shù)
        dataSource.setInitialSize(10);
        dataSource.setMaxActive(50);
        dataSource.setMinIdle(5);
        // 返回連接
        return dataSource.getConnection();
    }
}

數(shù)據(jù)庫(kù)建表語(yǔ)句:

CREATE TABLE `web_traffic_analysis` (
  `time` varchar(64) DEFAULT NULL COMMENT '時(shí)間',
  `ip` varchar(32) DEFAULT NULL COMMENT 'ip地址',
  `uid` varchar(32) DEFAULT NULL COMMENT 'uuid',
  `area` varchar(32) DEFAULT NULL COMMENT '地區(qū)',
  `web` varchar(64) DEFAULT NULL COMMENT '網(wǎng)址',
  `operate` varchar(32) DEFAULT NULL COMMENT '操作'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='網(wǎng)頁(yè)流量分析表'

三、啟動(dòng)程序
開(kāi)始本來(lái)是想將上面所有的功能都寫成函數(shù)方法,然后單獨(dú)開(kāi)一個(gè)Main()主函數(shù)的入口,然后在主函數(shù)下面調(diào)用那些方法(生產(chǎn)數(shù)據(jù)、消費(fèi)數(shù)據(jù)方法)。思路是借鑒python的:if name == ‘main’: 下調(diào)用很多的方法 。但實(shí)際執(zhí)行過(guò)程,是先生成數(shù)據(jù),然后將數(shù)據(jù)寫入kafka,然后再消費(fèi)數(shù)據(jù),過(guò)程執(zhí)行非常慢,這個(gè)方案被pass了。后來(lái)又想到多線程方案,一個(gè)線程跑生產(chǎn)數(shù)據(jù)和寫入數(shù)據(jù),一個(gè)線程跑消費(fèi)數(shù)據(jù)和寫入下游數(shù)據(jù)庫(kù)。這個(gè)方法是測(cè)試成功了,但是跑了一會(huì)兒就出現(xiàn)數(shù)據(jù)的積壓和內(nèi)存oom了,因?yàn)槲以O(shè)定的是1毫秒生產(chǎn)一條數(shù)據(jù),寫入kafka也需要一定的時(shí)間,加上電腦內(nèi)存不足,有點(diǎn)卡,這個(gè)方案也被pass了。最后的方案是將生產(chǎn)數(shù)據(jù)打包放到集群去跑,本地電腦開(kāi)啟消費(fèi)kafka數(shù)據(jù)寫入MySQL數(shù)據(jù)庫(kù)。結(jié)果如下:
生產(chǎn)數(shù)據(jù):
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql
消費(fèi)和寫入數(shù)據(jù)庫(kù)數(shù)據(jù):
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql
數(shù)據(jù)庫(kù)數(shù)據(jù):
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql
至此結(jié)束,后面有其他想法再補(bǔ)充~!

多線程部分代碼:

package com.example.study;

import com.lzl.flink.DataSourceFromKafka;
import com.lzl.flink.KafkaWriter;

public class WebApplication {
    public static void main(String[] args) throws Exception {
    // 創(chuàng)建線程1
     Thread threadOne = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true){
                try{
                    KafkaWriter kafkaWriter = new KafkaWriter();
                    kafkaWriter.webDataProducer();
                    kafkaWriter.writeToKafka();
                    System.out.println("線程一在跑~!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    });

  // 創(chuàng)建線程2
 Thread threadTwo = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                DataSourceFromKafka dataSourceFromKafka = new DataSourceFromKafka();
                try {
                    dataSourceFromKafka.transformFromKafka();
                    System.out.println("線程二在跑~!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    });

    //啟動(dòng)線程
    threadOne.start();
    threadTwo.start();
    Thread.sleep(5);

    }
}

結(jié)果:
記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程,flink,kafka,mysql文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-819833.html

到了這里,關(guān)于記一次Flink通過(guò)Kafka寫入MySQL的過(guò)程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 記一次模糊查詢踩坑 Flink+ES

    公司需要對(duì)商品名稱進(jìn)行模糊模糊查詢,考慮到商品表存量數(shù)據(jù)千萬(wàn)級(jí),直接數(shù)據(jù)庫(kù)模糊查詢效率肯定極其低下,所以選擇使用 ElasticSearch 對(duì)商品信息進(jìn)行模糊查詢。 因?yàn)樾枰嬖械牟樵兘涌冢3衷胁樵兘涌诘娜雲(yún)⒊鰠?,所以需要全?增量同步MySQL數(shù)據(jù)到ES進(jìn)行索引

    2024年02月05日
    瀏覽(29)
  • 記一次Kafka重復(fù)消費(fèi)解決過(guò)程

    ? ? ? ? 起因:車聯(lián)網(wǎng)項(xiàng)目開(kāi)發(fā),車輛發(fā)生故障需要給三個(gè)系統(tǒng)推送消息,故障上報(bào)較為頻繁,所以為了不阻塞主流程,采用了使用kafka。消費(fèi)方負(fù)責(zé)推送并保存推送記錄,但在一次壓測(cè)中發(fā)現(xiàn),實(shí)際只發(fā)生了10次故障,但是推送記錄卻有30多條。 ????????問(wèn)題排查,發(fā)現(xiàn)

    2024年02月13日
    瀏覽(25)
  • flink寫入到kafka 大坑解析。

    flink寫入到kafka 大坑解析。

    1.kafka能不能發(fā)送null消息? ? ?能! 2 flink能不能發(fā)送null消息到kafka? 不能! ? ? 這里就報(bào)了java的最常見(jiàn)錯(cuò)誤 空指針,原因就是flink要把kafka的消息getbytes。所以flink不能發(fā)送null到kafka。 這種問(wèn)題會(huì)造成什么后果? flink直接掛掉。 如果我們采取了失敗重試機(jī)制會(huì)怎樣? 數(shù)據(jù)重

    2024年02月15日
    瀏覽(15)
  • 【Flink-Kafka-To-Hive】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Hive

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 Hive。 2、相關(guān)配置存放于 Mysql 中,通過(guò) Mysql 進(jìn)行動(dòng)態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Flink 集成 Kafka 寫入 Hive 需要進(jìn)行 checkpoint 才能落盤至 HDFS。 5、先在 Hive 中創(chuàng)建表然后動(dòng)態(tài)獲取 Hive 的表

    2024年02月03日
    瀏覽(23)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 ClickHouse

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 ClickHouse。 2、相關(guān)配置存放于 Mysql 中,通過(guò) Mysql 進(jìn)行動(dòng)態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、先在 ClickHouse 中創(chuàng)建表然后動(dòng)態(tài)獲取 ClickHouse 的表結(jié)構(gòu)。 5、Kafka 數(shù)據(jù)為 Json 格式,通過(guò) FlatMap 扁平

    2024年02月03日
    瀏覽(23)
  • 6.2、Flink數(shù)據(jù)寫入到Kafka

    6.2、Flink數(shù)據(jù)寫入到Kafka

    目錄 1、添加POM依賴 2、API使用說(shuō)明 3、序列化器 3.1 使用預(yù)定義的序列化器 3.2 使用自定義的序列化器 4、容錯(cuò)保證級(jí)別 4.1?至少一次 的配置 4.2?精確一次 的配置 5、這是一個(gè)完整的入門案例 Apache Flink 集成了通用的 Kafka 連接器,使用時(shí)需要根據(jù)生產(chǎn)環(huán)境的版本引入相應(yīng)的依賴

    2024年02月09日
    瀏覽(15)
  • flink日志實(shí)時(shí)采集寫入Kafka/ElasticSearch

    flink日志實(shí)時(shí)采集寫入Kafka/ElasticSearch

    由于公司想要基于flink的日志做實(shí)時(shí)預(yù)警功能,故需要實(shí)時(shí)接入,并刷入es進(jìn)行分析。 日志接入必須異步,不能影響服務(wù)性能 kafka集群宕機(jī),依舊能夠提交flink任務(wù)且運(yùn)行任務(wù) kafka集群掛起恢復(fù),可以依舊續(xù)寫實(shí)時(shí)運(yùn)行日志 在類上加上@Plugin注解,標(biāo)記為自定義appender 在類加上

    2024年02月08日
    瀏覽(23)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費(fèi) Kafka 數(shù)據(jù)寫入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關(guān)配置都是通過(guò) Mysql 讀取的(生產(chǎn)環(huán)境中沒(méi)有直接寫死的,都是通過(guò)配置文件動(dòng)態(tài)配置),大家實(shí)際測(cè)試過(guò)程中可以將相關(guān)配置信息寫死。 2、此程序中 Kafka 涉及到了 Kerberos 認(rèn)證操作

    2024年02月03日
    瀏覽(21)
  • 【Flink-Kafka-To-Mongo】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Mongo(根據(jù)對(duì)應(yīng)操作類型進(jìn)行增、刪、改操作,寫入時(shí)對(duì)時(shí)間類型字段進(jìn)行單獨(dú)處理)

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 Mongo。 2、相關(guān)配置存放于 Mysql 中,通過(guò) Mysql 進(jìn)行動(dòng)態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Kafka 數(shù)據(jù)為 Json 格式,獲取到的數(shù)據(jù)根據(jù)操作類型字段進(jìn)行增刪改操作。 5、讀取時(shí)使用自定義 Source,寫

    2024年02月22日
    瀏覽(31)
  • 記一次批量更新mysql數(shù)據(jù)過(guò)程

    記一次批量更新mysql數(shù)據(jù)過(guò)程

    一、前言 需求背景:mysql數(shù)據(jù)庫(kù)中有一個(gè)表的數(shù)據(jù)(600多萬(wàn))有一個(gè)字段的內(nèi)容需要解密再通過(guò)另外一種加密方式進(jìn)行加密再回存。通過(guò)java程序計(jì)算完成更新。 二、方案一 一條條計(jì)算更新。這里是將手機(jī)號(hào)解密,在通過(guò)另外一種方式回存。 算法步驟: 1、查詢需要解密的數(shù)

    2024年02月10日
    瀏覽(20)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包