flink cdc DataStream api 時區(qū)問題
以postgrsql 作為數(shù)據(jù)源時,Date和timesatmp等類型cdc同步讀出來時,會發(fā)現(xiàn)一下幾個問題:
時間,日期等類型的數(shù)據(jù)對應(yīng)的會轉(zhuǎn)化為Int,long等類型。
源表同步后,時間相差8小時。這是因為時區(qū)不同的緣故。
源表:
sink 表:
解決方案:在自定義序列化時進(jìn)行處理。
java code
package pg.cdc.ds;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.text.SimpleDateFormat;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
ZoneId serverTimeZone;
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.創(chuàng)建JSON對象用于存儲最終數(shù)據(jù)
JSONObject result = new JSONObject();
Struct value = (Struct) sourceRecord.value();
//2.獲取庫名&表名
Struct sourceStruct = value.getStruct("source");
String database = sourceStruct.getString("db");
String schema = sourceStruct.getString("schema");
String tableName = sourceStruct.getString("table");
//3.獲取"before"數(shù)據(jù)
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) {
if (beforeValue != null) {
long times = (long) beforeValue / 1000;
String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));
beforeJson.put(field.name(), dateTime);
}
}
else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) {
if (beforeValue != null) {
long times = (long) beforeValue;
String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));
beforeJson.put(field.name(), dateTime);
}
} else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) {
if (beforeValue != null) {
long times = (long) beforeValue;
String dateTime = sdf.format(new Date((times - 8 * 60 * 60 )));
beforeJson.put(field.name(), dateTime);
}
} else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){
if(beforeValue != null) {
int times = (int) beforeValue;
String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));
beforeJson.put(field.name(), dateTime);
}
}
else {
beforeJson.put(field.name(), beforeValue);
}
}
}
//4.獲取"after"數(shù)據(jù)
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.MicroTimestamp".equals(field.schema().name())) {
if (afterValue != null) {
long times = (long) afterValue / 1000;
String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));
afterJson.put(field.name(), dateTime);
}
}
else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.NanoTimestamp".equals(field.schema().name())) {
if (afterValue != null) {
long times = (long) afterValue;
String dateTime = sdf.format(new Date((times - 8 * 60 * 60 * 1000)));
afterJson.put(field.name(), dateTime);
}
} else if ("int64".equals(field.schema().type().getName()) && "io.debezium.time.Timestamp".equals(field.schema().name())) {
if (afterValue != null) {
long times = (long) afterValue;
String dateTime = sdf.format(new Date((times - 8 * 60 * 60)));
afterJson.put(field.name(), dateTime);
}
}
else if("int32".equals(field.schema().type().getName()) && "io.debezium.time.Date".equals(field.schema().name())){
if(afterValue != null) {
int times = (int) afterValue;
String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));
afterJson.put(field.name(), dateTime);
}
}
else {
afterJson.put(field.name(), afterValue);
}
}
}
//5.獲取操作類型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type) || "read".equals(type)) {
type = "insert";
}
//6.將字段寫入JSON對象
result.put("database", database);
result.put("schema", schema);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.輸出數(shù)據(jù)
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
scala code
import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import com.ververica.cdc.debezium.utils.TemporalConversions
import io.debezium.time._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.{SchemaBuilder, Struct}
import org.apache.kafka.connect.source.SourceRecord
import java.sql
import java.time.{Instant, LocalDateTime, ZoneId}
import scala.collection.JavaConverters._
import scala.util.parsing.json.JSONObject
class StructDebeziumDeserializationSchema(serverTimeZone: String) extends DebeziumDeserializationSchema[Row] {
override def deserialize(sourceRecord: SourceRecord, collector: Collector[Row]): Unit = {
// 解析主鍵
val key = sourceRecord.key().asInstanceOf[Struct]
val keyJs = parseStruct(key)
// 解析值
val value = sourceRecord.value().asInstanceOf[Struct]
val source = value.getStruct("source")
val before = parseStruct(value.getStruct("before"))
val after = parseStruct(value.getStruct("after"))
val row = Row.withNames()
row.setField("table", s"${source.get("db")}.${source.get("table")}")
row.setField("key", keyJs)
row.setField("op", value.get("op"))
row.setField("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
row.setField("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
row.setField("before", before)
row.setField("after", after)
collector.collect(row)
}
/** 解析[[Struct]]結(jié)構(gòu)為json字符串 */
private def parseStruct(struct: Struct): String = {
if (struct == null) return null
val map = struct.schema().fields().asScala.map(field => {
val v = struct.get(field)
val typ = field.schema().name()
println(s"$v, $typ, ${field.name()}")
val value = v match {
case long if long.isInstanceOf[Long] => convertLongToTime(long.asInstanceOf[Long], typ)
case iv if iv.isInstanceOf[Int] => convertIntToDate(iv.asInstanceOf[Int], typ)
case iv if iv == null => null
case _ => convertObjToTime(v, typ)
}
(field.name(), value)
}).filter(_._2 != null).toMap
JSONObject.apply(map).toString()
}
/** 類型轉(zhuǎn)換 */
private def convertObjToTime(obj: Any, typ: String): Any = {
typ match {
case Time.SCHEMA_NAME | MicroTime.SCHEMA_NAME | NanoTime.SCHEMA_NAME =>
sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toString
case Timestamp.SCHEMA_NAME | MicroTimestamp.SCHEMA_NAME | NanoTimestamp.SCHEMA_NAME | ZonedTimestamp.SCHEMA_NAME =>
sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toString
case _ => obj
}
}
/** long 轉(zhuǎn)換為時間類型 */
private def convertLongToTime(obj: Long, typ: String): Any = {
val time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time")
val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp")
typ match {
case Time.SCHEMA_NAME =>
org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
case MicroTime.SCHEMA_NAME =>
org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
case NanoTime.SCHEMA_NAME =>
org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000 / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
case Timestamp.SCHEMA_NAME =>
val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
java.sql.Timestamp.valueOf(t).toString
case MicroTimestamp.SCHEMA_NAME =>
val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
java.sql.Timestamp.valueOf(t).toString
case NanoTimestamp.SCHEMA_NAME =>
val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
java.sql.Timestamp.valueOf(t).toString
case Date.SCHEMA_NAME =>
org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
case _ => obj
}
}
private def convertIntToDate(obj:Int, typ: String): Any ={
val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
typ match {
case Date.SCHEMA_NAME =>
org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
case _ => obj
}
}
override def getProducedType: TypeInformation[Row] = {
TypeInformation.of(classOf[Row])
}
}
mysql cdc時區(qū)問題
mysql cdc也會出現(xiàn)上述時區(qū)問題,Debezium默認(rèn)將MySQL中datetime類型轉(zhuǎn)成UTC的時間戳({@link io.debezium.time.Timestamp}),時區(qū)是寫死的無法更改,導(dǎo)致數(shù)據(jù)庫中設(shè)置的UTC+8,到kafka中變成了多八個小時的long型時間戳 Debezium默認(rèn)將MySQL中的timestamp類型轉(zhuǎn)成UTC的字符串。文章來源:http://www.zghlxwxcb.cn/news/detail-724921.html
解決思路有兩種:
1:自定義序列化方式的時候做時區(qū)轉(zhuǎn)換。
2:自定義時間轉(zhuǎn)換類,通過debezium配置文件指定轉(zhuǎn)化格式。
這里主要使用第二種方式。文章來源地址http://www.zghlxwxcb.cn/news/detail-724921.html
package com.zmn.schema;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;
/**
* 處理Debezium時間轉(zhuǎn)換的問題
* Debezium默認(rèn)將MySQL中datetime類型轉(zhuǎn)成UTC的時間戳({@link io.debezium.time.Timestamp}),時區(qū)是寫死的無法更改,
* 導(dǎo)致數(shù)據(jù)庫中設(shè)置的UTC+8,到kafka中變成了多八個小時的long型時間戳
* Debezium默認(rèn)將MySQL中的timestamp類型轉(zhuǎn)成UTC的字符串。
* | mysql | mysql-binlog-connector | debezium |
* | ----------------------------------- | ---------------------------------------- | --------------------------------- |
* | date<br>(2021-01-28) | LocalDate<br/>(2021-01-28) | Integer<br/>(18655) |
* | time<br/>(17:29:04) | Duration<br/>(PT17H29M4S) | Long<br/>(62944000000) |
* | timestamp<br/>(2021-01-28 17:29:04) | ZonedDateTime<br/>(2021-01-28T09:29:04Z) | String<br/>(2021-01-28T09:29:04Z) |
* | Datetime<br/>(2021-01-28 17:29:04) | LocalDateTime<br/>(2021-01-28T17:29:04) | Long<br/>(1611854944000) |
*
* @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
*/
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class);
private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
private ZoneId timestampZoneId = ZoneId.systemDefault();
@Override
public void configure(Properties props) {
readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));
}
private void readProps(Properties properties, String settingKey, Consumer<String> callback) {
String settingValue = (String) properties.get(settingKey);
if (settingValue == null || settingValue.length() == 0) {
return;
}
try {
callback.accept(settingValue.trim());
} catch (IllegalArgumentException | DateTimeException e) {
logger.error("The {} setting is illegal: {}",settingKey,settingValue);
throw e;
}
}
@Override
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
String sqlType = column.typeName().toUpperCase();
SchemaBuilder schemaBuilder = null;
Converter converter = null;
if ("DATE".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter = this::convertDate;
}
if ("TIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter = this::convertTime;
}
if ("DATETIME".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter = this::convertDateTime;
}
if ("TIMESTAMP".equals(sqlType)) {
schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter = this::convertTimestamp;
}
if (schemaBuilder != null) {
registration.register(schemaBuilder, converter);
}
}
private String convertDate(Object input) {
if (input instanceof LocalDate) {
return dateFormatter.format((LocalDate) input);
}
if (input instanceof Integer) {
LocalDate date = LocalDate.ofEpochDay((Integer) input);
return dateFormatter.format(date);
}
return null;
}
private String convertTime(Object input) {
if (input instanceof Duration) {
Duration duration = (Duration) input;
long seconds = duration.getSeconds();
int nano = duration.getNano();
LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
return timeFormatter.format(time);
}
return null;
}
private String convertDateTime(Object input) {
if (input instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) input);
}
return null;
}
private String convertTimestamp(Object input) {
if (input instanceof ZonedDateTime) {
// mysql的timestamp會轉(zhuǎn)成UTC存儲,這里的zonedDatetime都是UTC時間
ZonedDateTime zonedDateTime = (ZonedDateTime) input;
LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
return timestampFormatter.format(localDateTime);
}
return null;
}
}
使用方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "schema_only"); // 增量讀取
//自定義時間轉(zhuǎn)換配置
properties.setProperty("converters", "dateConverters");
properties.setProperty("dateConverters.type", "pg.cdc.ds.PgSQLDateTimeConverter");
properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
properties.setProperty("dateConverters.format.time", "HH:mm:ss");
properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8");
properties.setProperty("debezium.snapshot.locking.mode","none"); //全局讀寫鎖,可能會影響在線業(yè)務(wù),跳過鎖設(shè)置
properties.setProperty("include.schema.changes", "true");
// 使用flink mysql cdc 發(fā)現(xiàn)bigint unsigned類型的字段,capture以后轉(zhuǎn)成了字符串類型,
// 用的這個解析吧JsonDebeziumDeserializationSchema。
properties.setProperty("bigint.unsigned.handling.mode","long");
properties.setProperty("decimal.handling.mode","double");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.10.102")
.port(3306)
.username("yusys")
.password("yusys")
.port(3306)
.databaseList("gmall")
.tableList("gmall.faker_user1")
.deserializer(new JsonDebeziumDeserializationSchema())
.debeziumProperties(properties)
.serverId(5409)
.build();
SingleOutputStreamOperator<string> dataSource = env
.addSource(sourceFunction).setParallelism(10).name("binlog-source");
到了這里,關(guān)于flink cdc DataStream api 時區(qū)問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!