環(huán)境:Flink 1.15.0,cdc2.3.0
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
目的:為了測(cè)試cdc2.3支持從"specific-offset"啟動(dòng)程序。
代碼如下:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
String offsetFile = "binlog.000002";
Long offsetPos = 160299739L; //154 219 504
Properties prop = new Properties();
prop.setProperty("snapshot.locking.mode", "none");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("地址")
.port(端口)
.databaseList("數(shù)據(jù)庫名") // monitor all tables under inventory database
.tableList("數(shù)據(jù)庫名.表名") // set captured table
.username("用戶名")
.password("密碼")
//設(shè)置讀取位置 initial全量, latest增量, specificOffset(binlog指定位置開始讀,該功能cdc2.2版本不支持)
.startupOptions(StartupOptions.specificOffset(offsetFile, Long.valueOf(offsetPos)))
// .startupOptions(StartupOptions.initial())
// .startupOptions(StartupOptions.latest())
.debeziumProperties(prop)
.deserializer(new StringDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.print("====>")
.setParallelism(1);
env.execute();
}
}
報(bào)錯(cuò)如下:
文章來源:http://www.zghlxwxcb.cn/news/detail-732105.html
引入下面依賴,解決報(bào)錯(cuò):文章來源地址http://www.zghlxwxcb.cn/news/detail-732105.html
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.15.0</version>
</dependency>
到了這里,關(guān)于java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter解決的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!