国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決

這篇具有很好參考價值的文章主要介紹了【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

問題一:7張表是同一個mysql中的,我們進行增量同步時分別用不同的flink任務(wù)讀取,造成mysql server-id沖突問題,如下:

【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決,大數(shù)據(jù),flink,jvm,mysql

Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘’ at 4, the last event read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118, the last byte read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118. Error code: 1236; SQLSTATE: HY000.

問題分析:主要是多個任務(wù)都讀取的同一個binlog造成serverid沖突;

網(wǎng)上有設(shè)置server-id的方法,這個是可以解決的,但是需要分方案來談;
例如:我們是需要一個任務(wù)通用的(7張表重復提交7次,更改flink傳參)
這種就不適合我們,因為每次的任務(wù)都是同一個jar,讀取一段時間后還是會報錯;

解決方案:其實我們可以把在同一個mysql庫里面的表放到同一個source里面;

解決代碼:我們是mysql同步到starrocks里面,使用的sr官方的sink function,不同sink的可以參考這個源代碼;

官方是推薦采用StarRocksSink.sink(options)這種方式,我們查看源碼,其實是采用的v2這一個fun,點進去發(fā)現(xiàn),sr官方對數(shù)據(jù)進行了處理,只需要匹配對應的類型即可;
【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決,大數(shù)據(jù),flink,jvm,mysql
所以我們只需要對mysqlSource進行一個算子轉(zhuǎn)換即可,關(guān)鍵代碼如下:

MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();
        MySqlSource<String> mySqlSource = builder
                .hostname(srcHost)
                .port(3306)
                .databaseList(srcDb)
                // 格式 db.table,db.table2......
                .tableList(srcTable)
                .username(srcUsername)
                .password(srcPassword)
                .jdbcProperties(jbdcProperties)
                .debeziumProperties(properties)
                // 這里反序列化我進行了StarRocks增刪類型處理,具體可以看我這片文章https://blog.csdn.net/JGMa_TiMo/article/details/128327546
                .deserializer(jsonStringDebeziumDeserializationSchema)
                .serverId("5400-6400")
                .build();

        DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[<< job: >>" + propKey + "]");
//                .setParallelism(parallelism);

        // todo 這里是關(guān)鍵地方,我們反序列化只能返回flink認可的類型,一般都是string,這里轉(zhuǎn)換成上面sr可以處理的對象 StarRocksSinkRowDataWithMeta
        SingleOutputStreamOperator<StarRocksSinkRowDataWithMeta> streamOperator = streamSource.flatMap(new FlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {
            @Override
            public void flatMap(String value, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {
                HashMap hashMap = JsonUtils.parseObject(value, HashMap.class);
                StarRocksSinkRowDataWithMeta sinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta();
                sinkRowDataWithMeta.addDataRow(value);
                assert hashMap != null;
                sinkRowDataWithMeta.setTable(hashMap.get("__table").toString());
                sinkRowDataWithMeta.setDatabase(sinkDb);
                collector.collect(sinkRowDataWithMeta);
            }
        }).name("Data Filtering");

        StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai")
                .withProperty("load-url", sinkHost + ":8030")
                .withProperty("database-name", sinkDb)
                .withProperty("username", sinkUsername)
                .withProperty("password", sinkPassword)
                // 這里的設(shè)置會被多srcTable覆蓋
                .withProperty("table-name", "")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                .build();

		// 這里查看sr sink源碼實際使用的是這個fun,我們不要使用SinkFunctionFactory生成:會泛型不支持
        streamOperator.addSink(new StarRocksDynamicSinkFunctionV2<>(sinkOptions))
                .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
//        streamOperator.addSink(StarRocksSink.sink(sinkOptions))
//                .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());

        env.execute(propKey + "<< stream sync job >>" + srcHost + srcDb);

解讀:原理就是我們把原來7張在一個數(shù)據(jù)庫的表放到一個flink source中讀取,在指定傳輸?shù)侥莻€starrocks表時,官方已經(jīng)實現(xiàn)了代碼支持,我們只需要增加一個flink算子轉(zhuǎn)換成sink支持的對象即可,(關(guān)聯(lián)一個source對應多個sink解決思路)

問題二:我們是采用datastream api開發(fā)的flink任務(wù),在web-ui界面提交任務(wù),造成taskManager的jvm Metaspace一直增長直到節(jié)點掛掉。

報錯就不貼了,就是taskmanager會自動掛掉,查看tm的日志是oom異常:jvm metaspace溢出;

問題分析:我們采用web-ui多次提交flink任務(wù)時,flink是動態(tài)類加載,所以不會釋放上一個jar的元空間,才會造成jvm垃圾不回收;

可以看官方的issues:
https://issues.apache.org/jira/browse/FLINK-11205
https://issues.apache.org/jira/browse/FLINK-16408

問題解決:把jar放到flink/lib目錄下就可以了,這樣flink會優(yōu)先加載父加載器中的類;

這里需要注意和以前的jar會造成版本沖突,具體解決你可以根據(jù)報錯信息慢慢調(diào)試,我這里貼一個我的環(huán)境信息:
當前環(huán)境中的lib包
【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決,大數(shù)據(jù),flink,jvm,mysql
我增加的lib包
【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決,大數(shù)據(jù),flink,jvm,mysql
下面是我的jar包依賴,打完包放到lib中即可

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.15.3</flink.version>
        <flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr>
        <flink.connector.mysql>2.3.0</flink.connector.mysql>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <!-- 基礎(chǔ)包 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks</artifactId>
            <version>${flink.connector.sr}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink.connector.mysql}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-test-utils</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.4</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.3.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2</artifactId>
            <version>2.8.3-10.0</version>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.15</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.3.26</version>
        </dependency>

        <dependency>
            <groupId>org.yaml</groupId>
            <artifactId>snakeyaml</artifactId>
            <version>2.0</version>
        </dependency>

    </dependencies>

問題二:最后就是在web-ui提交任務(wù),你也可以用命令行,這里我用的源碼包,就是我開發(fā)的實際編寫代碼,就幾十K

【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決,大數(shù)據(jù),flink,jvm,mysql
最后如果解決了你的問題,請點個贊吧【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決,大數(shù)據(jù),flink,jvm,mysql文章來源地址http://www.zghlxwxcb.cn/news/detail-695941.html

到了這里,關(guān)于【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 對比flink cdc和canal獲取mysql binlog優(yōu)缺點

    Flink CDC和Canal都是用于獲取MySQL binlog的工具,但是有以下幾點優(yōu)缺點對比: Flink CDC是一個基于Flink的庫,可以直接在Flink中使用,無需額外的組件或服務(wù),而Canal是一個獨立的服務(wù),需要單獨部署和運行,增加了系統(tǒng)的復雜度和成本 Flink CDC支持多種數(shù)據(jù)庫的數(shù)據(jù)變化捕獲,如

    2024年02月11日
    瀏覽(31)
  • Flink批處理metaspace內(nèi)存溢出問題

    Flink批處理metaspace內(nèi)存溢出問題

    早上過來發(fā)現(xiàn)定時任務(wù)出現(xiàn)告警,F(xiàn)link Jobs運行失敗,登錄Flinkweb后臺一看,所有jobs都沒了,slot也為0。 查看Flink日志,有以下錯誤異常: 根據(jù)錯誤異常不難得出,是因為metaspace內(nèi)存溢出導致的。 通過日志能觀察到是一個批處理任務(wù)(FlinkJobCheatFind)導致;這個批處理任務(wù)是通過

    2024年02月08日
    瀏覽(20)
  • JVM:全面理解線上服務(wù)器內(nèi)存溢出(OOM)問題處理方案(一)

    JVM:全面理解線上服務(wù)器內(nèi)存溢出(OOM)問題處理方案(一)

    前段時間生產(chǎn)上遇到了OOM問題,導致服務(wù)出現(xiàn)了短時間的不可用,還好處理及時,否則也將釀成大禍。OOM問題也是生產(chǎn)中比較重要的問題,所以本期我們針對OOM問題特別講解,結(jié)合理論與實際案例來帶大家徹底攻克OOM問題處理。 要解決問題,我們首先要清楚問題產(chǎn)生的原因。

    2024年02月12日
    瀏覽(25)
  • flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題

    flink開發(fā)常見問題 —— flink-kafka 依賴版本沖突問題

    由于 flink / kafka 的版本不斷更新,創(chuàng)建項目的時候就應當考慮清楚這幾個依賴庫的版本問題,盡可能地與實際場景保持一致,比如服務(wù)器上部署的 kafka 是哪個版本,flink 是哪個版本,從而確定我們需要開發(fā)的是哪個版本,并且在真正的開發(fā)工作開始之前,應當先測試一下保證

    2024年02月07日
    瀏覽(30)
  • 關(guān)于git 解決分支沖突問題(具體操作,包含截圖,教你一步一步解決沖突問題)

    關(guān)于git 解決分支沖突問題(具體操作,包含截圖,教你一步一步解決沖突問題)

    當在Git中有多個開發(fā)者在同一個分支上工作時,可能會發(fā)生分支沖突。分支沖突指的是多個開發(fā)者在同一時間修改相同的代碼文件,導致Git無法自動合并這些更改。 比如說:我在github上進行了md文件的修改,我在本地倉庫里面也進行md文件的修改,這個時候,提交的時候會出

    2024年02月21日
    瀏覽(28)
  • 關(guān)于本機Docker與vmware沖突問題

    關(guān)于本機Docker與vmware沖突問題

    在本機安裝docker,目前以VMware可以正常使用為例 一、Docker與VMware沖突的原因: 微軟的hyper-V(虛擬化軟件),使用docker就必須要啟動hyper-V服務(wù),當開始hyper-V啟動后,VMware的Windows10會由于啟動了hyper-V導致VMware的Windows10無法啟動,產(chǎn)生沖突。 如下:hyper-V關(guān)閉,docker情況(顯示

    2024年02月04日
    瀏覽(16)
  • Flink CDC報The connector is trying to read binlog starting at xxx but this is no longer available問題解決

    Flink CDC報The connector is trying to read binlog starting at xxx but this is no longer available問題解決

    問題是筆者最近在使用FlinkCDC 2.3.0 捕獲MySQL binlog日志時遇到的,MySQL使用的阿里云的RDS, MysqlCDC 使用讀賬號以 Initinal 模式,任務(wù)已經(jīng)運行了一段時間突然報的錯,之前在使用FlinkCDC時也曾遇到過,設(shè)置了一些參數(shù)后沒有再出現(xiàn)過,一直比較忙沒有來得及總結(jié)下來。但是今天同

    2024年02月07日
    瀏覽(24)
  • 關(guān)于微軟Microsoft 365 開發(fā)中心 郵箱域 CNAME?(別名)解析指向記錄沖突的問題解決辦法!

    關(guān)于微軟Microsoft 365 開發(fā)中心 郵箱域 CNAME?(別名)解析指向記錄沖突的問題解決辦法!

    關(guān)于微軟Microsoft 365開發(fā)中心 郵箱域的DNS添加?CNAME?(別名)解析指向記錄,CNAME沖突的問題解決辦法!? 先說明問題,如下圖: 我用的是華為平臺域名解析: ?一直出現(xiàn)? ?CNAME?(別名)指向沖突! 按照微軟的要求怎么也添加不上???CNAME?(別名)記錄 ----------------試過許

    2024年02月02日
    瀏覽(17)
  • jvm里的內(nèi)存溢出

    jvm里的內(nèi)存溢出

    目錄 堆溢出 虛擬機棧和本地方法棧溢出(棧溢出很少出現(xiàn)) 方法區(qū)和運行時常量池溢出 ?本機內(nèi)存直接溢出(實際中很少出現(xiàn)、了解即可) 堆溢出:最常見的是大list,list里面有很多元 素 堆溢出該怎么解決 : ????????定位到導致內(nèi)存溢出的對象 ????????判斷是否是

    2024年02月13日
    瀏覽(29)
  • Java jvm 內(nèi)存溢出分析

    Java jvm 內(nèi)存溢出分析

    我們經(jīng)常用visualVm監(jiān)控Jvm的內(nèi)存,cpu,線程的使用情況,通??梢愿鶕?jù)內(nèi)存不斷增長來判斷內(nèi)存是否存在不釋放。但是我們不可能時時盯著去看,這里涉及jvm堆內(nèi)存配置,堆內(nèi)存參數(shù)配置和調(diào)優(yōu)會在其他章節(jié)編寫。 如果真是內(nèi)存溢出了,線上出現(xiàn)的我們需要配置JVm內(nèi)存溢出,

    2024年02月09日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包