問題一:7張表是同一個mysql中的,我們進行增量同步時分別用不同的flink任務(wù)讀取,造成mysql server-id沖突問題,如下:
Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘’ at 4, the last event read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118, the last byte read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118. Error code: 1236; SQLSTATE: HY000.
問題分析:主要是多個任務(wù)都讀取的同一個binlog造成serverid沖突;
網(wǎng)上有設(shè)置server-id的方法,這個是可以解決的,但是需要分方案來談;
例如:我們是需要一個任務(wù)通用的(7張表重復提交7次,更改flink傳參)
這種就不適合我們,因為每次的任務(wù)都是同一個jar,讀取一段時間后還是會報錯;
解決方案:其實我們可以把在同一個mysql庫里面的表放到同一個source里面;
解決代碼:我們是mysql同步到starrocks里面,使用的sr官方的sink function,不同sink的可以參考這個源代碼;
官方是推薦采用StarRocksSink.sink(options)這種方式,我們查看源碼,其實是采用的v2這一個fun,點進去發(fā)現(xiàn),sr官方對數(shù)據(jù)進行了處理,只需要匹配對應的類型即可;
所以我們只需要對mysqlSource進行一個算子轉(zhuǎn)換即可,關(guān)鍵代碼如下:
MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();
MySqlSource<String> mySqlSource = builder
.hostname(srcHost)
.port(3306)
.databaseList(srcDb)
// 格式 db.table,db.table2......
.tableList(srcTable)
.username(srcUsername)
.password(srcPassword)
.jdbcProperties(jbdcProperties)
.debeziumProperties(properties)
// 這里反序列化我進行了StarRocks增刪類型處理,具體可以看我這片文章https://blog.csdn.net/JGMa_TiMo/article/details/128327546
.deserializer(jsonStringDebeziumDeserializationSchema)
.serverId("5400-6400")
.build();
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[<< job: >>" + propKey + "]");
// .setParallelism(parallelism);
// todo 這里是關(guān)鍵地方,我們反序列化只能返回flink認可的類型,一般都是string,這里轉(zhuǎn)換成上面sr可以處理的對象 StarRocksSinkRowDataWithMeta
SingleOutputStreamOperator<StarRocksSinkRowDataWithMeta> streamOperator = streamSource.flatMap(new FlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {
@Override
public void flatMap(String value, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {
HashMap hashMap = JsonUtils.parseObject(value, HashMap.class);
StarRocksSinkRowDataWithMeta sinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta();
sinkRowDataWithMeta.addDataRow(value);
assert hashMap != null;
sinkRowDataWithMeta.setTable(hashMap.get("__table").toString());
sinkRowDataWithMeta.setDatabase(sinkDb);
collector.collect(sinkRowDataWithMeta);
}
}).name("Data Filtering");
StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai")
.withProperty("load-url", sinkHost + ":8030")
.withProperty("database-name", sinkDb)
.withProperty("username", sinkUsername)
.withProperty("password", sinkPassword)
// 這里的設(shè)置會被多srcTable覆蓋
.withProperty("table-name", "")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build();
// 這里查看sr sink源碼實際使用的是這個fun,我們不要使用SinkFunctionFactory生成:會泛型不支持
streamOperator.addSink(new StarRocksDynamicSinkFunctionV2<>(sinkOptions))
.name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
// streamOperator.addSink(StarRocksSink.sink(sinkOptions))
// .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
env.execute(propKey + "<< stream sync job >>" + srcHost + srcDb);
解讀
:原理就是我們把原來7張在一個數(shù)據(jù)庫的表放到一個flink source中讀取,在指定傳輸?shù)侥莻€starrocks表時,官方已經(jīng)實現(xiàn)了代碼支持,我們只需要增加一個flink算子轉(zhuǎn)換成sink支持的對象即可,(關(guān)聯(lián)一個source對應多個sink解決思路)
問題二:我們是采用datastream api開發(fā)的flink任務(wù),在web-ui界面提交任務(wù),造成taskManager的jvm Metaspace一直增長直到節(jié)點掛掉。
報錯就不貼了,就是taskmanager會自動掛掉,查看tm的日志是oom異常:jvm metaspace溢出;
問題分析:我們采用web-ui多次提交flink任務(wù)時,flink是動態(tài)類加載,所以不會釋放上一個jar的元空間,才會造成jvm垃圾不回收;
可以看官方的issues:
https://issues.apache.org/jira/browse/FLINK-11205
https://issues.apache.org/jira/browse/FLINK-16408
問題解決:把jar放到flink/lib目錄下就可以了,這樣flink會優(yōu)先加載父加載器中的類;
這里需要注意和以前的jar會造成版本沖突,具體解決你可以根據(jù)報錯信息慢慢調(diào)試,我這里貼一個我的環(huán)境信息:當前環(huán)境中的lib包
我增加的lib包
下面是我的jar包依賴,打完包放到lib中即可
文章來源:http://www.zghlxwxcb.cn/news/detail-695941.html
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.3</flink.version>
<flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr>
<flink.connector.mysql>2.3.0</flink.connector.mysql>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 基礎(chǔ)包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${flink.connector.sr}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.connector.mysql}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.8.3-10.0</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.15</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.3.26</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
</dependencies>
問題二:最后就是在web-ui提交任務(wù),你也可以用命令行,這里我用的源碼包,就是我開發(fā)的實際編寫代碼,就幾十K
最后如果解決了你的問題,請點個贊吧文章來源地址http://www.zghlxwxcb.cn/news/detail-695941.html
到了這里,關(guān)于【Flink】關(guān)于jvm元空間溢出,mysql binlog沖突的問題解決的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!