目錄
前言:
1、springboot引入依賴:
2、yml配置文件
3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器
4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象
5、CDC 數(shù)據(jù)實體類
6、自定義ApplicationContextUtil
7、自定義sink 交由spring管理,處理變更數(shù)據(jù)
前言:
? ? ? ? 我的場景是從SQL Server數(shù)據(jù)庫獲取指定表的增量數(shù)據(jù),查詢了很多獲取增量數(shù)據(jù)的方案,最終選擇了Flink的 flink-connector-sqlserver-cdc ,這個需要用到SQL Server 的CDC(變更數(shù)據(jù)捕獲),通過CDC來獲取增量數(shù)據(jù),處理數(shù)據(jù)前需要對數(shù)據(jù)庫進(jìn)行配置,如果不清楚如何配置可以看看我這篇文章:《SQL Server數(shù)據(jù)庫開啟CDC變更數(shù)據(jù)捕獲操作指引》
廢話不多說,直接上干貨,如有不足還請指正文章來源:http://www.zghlxwxcb.cn/news/detail-499803.html
1、springboot引入依賴:
<properties>
<flink.version>1.16.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>9.4.0.jre8</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-sqlserver-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.6</version>
</dependency>
</dependencies>
2、yml配置文件
spring:
datasource:
url: jdbc:sqlserver://127.0.0.1:1433;DatabaseName=HM_5001
username: sa
password: root
driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
# 實時同步SQL Server數(shù)據(jù)庫配置
CDC:
DataSource:
host: 127.0.0.1
port: 1433
database: HM_5001
tableList: dbo.t1,dbo.Tt2,dbo.t3,dbo.t4
username: sa
password: sa
3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* SQL server CDC變更監(jiān)聽器
**/
@Component
@Slf4j
public class SQLServerCDCListener implements ApplicationRunner, Serializable {
/**
* CDC數(shù)據(jù)源配置
*/
@Value("${CDC.DataSource.host}")
private String host;
@Value("${CDC.DataSource.port}")
private String port;
@Value("${CDC.DataSource.database}")
private String database;
@Value("${CDC.DataSource.tableList}")
private String tableList;
@Value("${CDC.DataSource.username}")
private String username;
@Value("${CDC.DataSource.password}")
private String password;
private final DataChangeSink dataChangeSink;
public SQLServerCDCListener(DataChangeSink dataChangeSink) {
this.dataChangeSink = dataChangeSink;
}
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("開始啟動Flink CDC獲取ERP變更數(shù)據(jù)......");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
DataStream<DataChangeInfo> streamSource = env
.addSource(dataChangeInfoMySqlSource, "SQLServer-source")
.setParallelism(1);
streamSource.addSink(dataChangeSink);
env.execute("SQLServer-stream-cdc");
}
/**
* 構(gòu)造CDC數(shù)據(jù)源
*/
private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
String[] tables = tableList.replace(" ", "").split(",");
return SqlServerSource.<DataChangeInfo>builder()
.hostname(host)
.port(Integer.parseInt(port))
.database(database) // monitor sqlserver database
.tableList(tables) // monitor products table
.username(username)
.password(password)
/*
*initial初始化快照,即全量導(dǎo)入后增量導(dǎo)入(檢測更新數(shù)據(jù)寫入)
* latest:只進(jìn)行增量導(dǎo)入(不讀取歷史變化)
*/
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
}
}
4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
/**
* SQLServer消息讀取自定義序列化
**/
@Slf4j
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<DataChangeInfo> {
public static final String TS_MS = "ts_ms";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String SOURCE = "source";
public static final String CREATE = "CREATE";
public static final String UPDATE = "UPDATE";
/**
*
* 反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
try {
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
final Struct source = struct.getStruct(SOURCE);
DataChangeInfo dataChangeInfo = new DataChangeInfo();
dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
// 獲取操作類型 CREATE UPDATE DELETE 1新增 2修改 3刪除
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toUpperCase();
int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
dataChangeInfo.setEventType(eventType);
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
ZoneId zone = ZoneId.systemDefault();
Long timestamp = Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis);
dataChangeInfo.setChangeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zone));
//7.輸出數(shù)據(jù)
collector.collect(dataChangeInfo);
} catch (Exception e) {
log.error("SQLServer消息讀取自定義序列化報錯:{}", e.getMessage());
e.printStackTrace();
}
}
/**
*
* 從源數(shù)據(jù)獲取出變更之前或之后的數(shù)據(jù)
*/
private JSONObject getJsonObject(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation<DataChangeInfo> getProducedType() {
return TypeInformation.of(DataChangeInfo.class);
}
}
5、CDC 數(shù)據(jù)實體類
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* CDC 數(shù)據(jù)實體類
*/
@Data
public class DataChangeInfo implements Serializable {
/**
* 數(shù)據(jù)庫名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 變更時間
*/
private LocalDateTime changeTime;
/**
* 變更類型 1新增 2修改 3刪除
*/
private Integer eventType;
/**
* 變更前數(shù)據(jù)
*/
private String beforeData;
/**
* 變更后數(shù)據(jù)
*/
private String afterData;
}
6、自定義ApplicationContextUtil
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.io.Serializable;
@Component
public class ApplicationContextUtil implements ApplicationContextAware, Serializable {
/**
* 上下文
*/
private static ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return context;
}
public static <T> T getBean(Class<T> beanClass) {
return context.getBean(beanClass);
}
}
7、自定義sink 交由spring管理,處理變更數(shù)據(jù)
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* 自定義sink 交由spring管理
* 處理變更數(shù)據(jù)
**/
@Component
@Slf4j
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {
private static final long serialVersionUID = -74375380912179188L;
private UserMapper userMapper;
/**
* 在open()方法中動態(tài)注入Spring容器的類
* 在啟動SpringBoot項目是加載了Spring容器,其他地方可以使用@Autowired獲取Spring容器中的類;
* 但是Flink啟動的項目中,默認(rèn)啟動了多線程執(zhí)行相關(guān)代碼,導(dǎo)致在其他線程無法獲取Spring容器,
* 只有在Spring所在的線程才能使用@Autowired,故在Flink自定義的Sink的open()方法中初始化Spring容器
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
userMapper = ApplicationContextUtil.getBean(UserMapper.class);
}
@Override
public void invoke(DataChangeInfo dataChangeInfo, Context context) {
log.info("收到變更原始數(shù)據(jù):{}", dataChangeInfo);
// TODO 開始處理你的數(shù)據(jù)吧
}
以上是我親自驗證測試的結(jié)果,已發(fā)布生產(chǎn)環(huán)境,如有不足還請指正。文章來源地址http://www.zghlxwxcb.cn/news/detail-499803.html
到了這里,關(guān)于實戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!