環(huán)境
flink 1.15.3(此時(shí)最新版本為1.16.1)
mysql 5.7+
starrocks 2.5.2
mysql同步表結(jié)構(gòu)
mysql中的timestamp字段是可以正常同步的,但是多了8小時(shí),設(shè)置了mysql鏈接屬性也沒(méi)效果
CREATE TABLE `temp_flink` (
`id` int(11) NOT NULL,
`name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`remark` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`create_time` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
參考下方的鏈接有兩種方式;文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-596301.html
這里使用單獨(dú)的轉(zhuǎn)換器代碼如下
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* mysql日期字段時(shí)區(qū)/格式處理
* @author JGMa
*/
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
@Override
public void configure(Properties props) {
}
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
String sqlType = column.typeName().toUpperCase();
SchemaBuilder schemaBuilder = null;
Converter converter = null;
if ("DATE".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter = this::convertDate;
}
if ("TIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter = this::convertTime;
}
if ("DATETIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter = this::convertDateTime;
}
if ("TIMESTAMP".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter = this::convertTimestamp;
}
if (schemaBuilder != null) {
registration.register(schemaBuilder, converter);
}
}
private String convertDate(Object input) {
if (input == null) {
return null;
}
if (input instanceof LocalDate) {
return dateFormatter.format((LocalDate) input);
}
if (input instanceof Integer) {
LocalDate date = LocalDate.ofEpochDay((Integer) input);
return dateFormatter.format(date);
}
return String.valueOf(input);
}
private String convertTime(Object input) {
if (input == null) {
return null;
}
if (input instanceof Duration) {
Duration duration = (Duration) input;
long seconds = duration.getSeconds();
int nano = duration.getNano();
LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
return timeFormatter.format(time);
}
return String.valueOf(input);
}
private String convertDateTime(Object input) {
if (input == null) {
return null;
}
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");
}
return String.valueOf(input);
}
private String convertTimestamp(Object input) {
if (input == null) {
return null;
}
if (input instanceof ZonedDateTime) {
// mysql的timestamp會(huì)轉(zhuǎn)成UTC存儲(chǔ),這里的zonedDatetime都是UTC時(shí)間
ZonedDateTime zonedDateTime = (ZonedDateTime) input;
LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
return timestampFormatter.format(localDateTime).replaceAll("T", " ");
}
return String.valueOf(input);
}
}
使用
{
public static void main(String[] args) {
String tableName = "temp_flink";
String srcHost = "192.168.10.14";
String srcDatabase = "xcode";
String srcUsername = "root";
String srcPassword = "123456";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
Properties mysqlProperties = new Properties();
// mysqlProperties.setProperty("characterEncoding","UTF-8");
// mysqlProperties.setProperty("connectionTimeZone","Asia/Shanghai");
//自定義時(shí)間轉(zhuǎn)換配置
mysqlProperties.setProperty("converters", "dateConverters");
mysqlProperties.setProperty("dateConverters.type", "com.txlc.flink.core.MySqlDateTimeConverter");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(srcHost)
// .jdbcProperties(mysqlProperties)
.port(3306)
.databaseList(srcDatabase)
.tableList(srcDatabase + "." + tableName)
.username(srcUsername)
.password(srcPassword)
// .serverTimeZone("Asia/Shanghai")
// 主要是這里
.debeziumProperties(mysqlProperties)
.deserializer(new JsonStringDebeziumDeserializationSchema())
.build();
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[temp_flink-source]")
.setParallelism(1);
streamSource.addSink(StarRocksSink.sink(
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8")
.withProperty("load-url", "192.168.10.245:8030")
.withProperty("database-name", "xcode")
.withProperty("username", "root")
.withProperty("password", "123456")
.withProperty("table-name", tableName)
// 自 2.4 版本,支持更新主鍵模型中的部分列。您可以通過(guò)以下兩個(gè)屬性指定需要更新的列。
// .withProperty("sink.properties.partial_update", "true")
// .withProperty("sink.properties.columns", "k1,k2,k3")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
// 設(shè)置并行度,多并行度情況下需要考慮如何保證數(shù)據(jù)有序性
.withProperty("sink.parallelism", "1")
.build())
).name(">>>StarRocks temp_flink Sink<<<");
try {
env.execute("temp_flink stream sync");
} catch (Exception e) {
e.printStackTrace();
log.error("[sync error] info : {}", e);
}
}
}
參考資料
https://blog.csdn.net/cloudbigdata/article/details/122935333
https://blog.csdn.net/WuBoooo/article/details/127387144文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-596301.html
到了這里,關(guān)于Flink cdc同步mysql到starrocks(日期時(shí)間格式/時(shí)區(qū)處理)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!