国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink CDC整庫同步(多表異構(gòu)同步)

這篇具有很好參考價值的文章主要介紹了Flink CDC整庫同步(多表異構(gòu)同步)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

flinkcdc單表同步比較簡單,按照官方案例基本都能成功,多表異構(gòu)同步、整庫同步這塊一直想嘗試一下,社區(qū)說使用API可以做到,但是一直沒能白嫖到可行方案(代碼),然后自己動手嘗試了下,咳咳,無奈技術(shù)太菜,java各種語法都搞的不是太明白,時間跨度蠻久,中間遇到了不少問題,中途偶然間在群里看到了很久很久以前群友發(fā)的一份同步方案,可惜缺少了反序列化的過程,借鑒過來改巴改巴(也改了好幾個星期,太菜了),勉強是能跑了,分享出來,能幫到大家一點也就很好了。

方案思路

這個方案的整體思路我先說一下(大佬的思路,我借鑒的),首先我們先使用mysqlcatalog獲取到各個表的信息(列名、列類型之類的),然后創(chuàng)建相應(yīng)的sink table,然后flinkcdc的DataStream是提供了整庫獲取數(shù)據(jù)的能力的,所以我們就采用DataStream的方式拿到數(shù)據(jù),然后在自定義反序列化里形成<tableName,Row>的輸出,得到DataStream<<tableName,Row>,然后根據(jù)tableName將這個流拆分(過濾),就相當(dāng)于一個tablename對應(yīng)一個自己的DataStream,然后將每個流轉(zhuǎn)為一個sourcetable,然后insert into sinktable select * from sourcetable,然后…gameover。

走起:

flink版本:1.15.2(1.15以下版本貌似還沒有mysqlcatalog,如果要使用低版本,代碼需要調(diào)整一下)
flink cdc版本:2.3.0
注意:需先在sink庫創(chuàng)建好相應(yīng)的表(之前忘記寫了)

不巴拉了,直接上代碼,場景是mysql -> mysql,sink端如果是其他數(shù)據(jù)庫理論上應(yīng)該是一樣,source表需要有主鍵,這是flinkcdc底層約定好的,沒有會報錯。

package com.cityos;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * flink cdc 整庫同步
 */
public class FlinkCdcMultiSyncJdbc {

    private static final Logger log = LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class);

    public static void main(String[] args) throws Exception {

       // source端連接信息
        String userName = "root";
        String passWord = "18772247265Ldy@";
        String host = "localhost";
        String db = "flinktest1";
       // 如果是整庫,tableList = ".*"
        String tableList = "lidy.nlp_category,lidy.nlp_classify_man_made3";
        int port = 33306;

       // sink連接信息模板
        String sink_url = "jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai";
        String sink_username = "root";
        String sink_password = "18772247265Ldy@";

        String connectorWithBody =
                " with (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'url' = '${sink_url}',\n" +
                        " 'username' = '${sink_username}',\n" +
                        " 'password' = '${sink_password}',\n" +
                        " 'table-name' = '${tableName}'\n" +
                        ")";
        connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url)
                .replace("${sink_username}", sink_username)
                .replace("${sink_password}", sink_password);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 注冊同步的庫對應(yīng)的catalog
        MySqlCatalog mysqlCatalog = new MySqlCatalog("mysql-catalog", db, userName, passWord, String.format("jdbc:mysql://%s:%d", host, port));
        List<String> tables = new ArrayList<>();

       // 如果整庫同步,則從catalog里取所有表,否則從指定表中取表名
        if (".*".equals(tableList)) {
            tables = mysqlCatalog.listTables(db);
        } else {
            String[] tableArray = tableList.split(",");
            for (String table : tableArray) {
                tables.add(table.split("\\.")[1]);
            }
        }
       // 創(chuàng)建表名和對應(yīng)RowTypeInfo映射的Map
        Map<String, RowTypeInfo> tableTypeInformationMap = Maps.newConcurrentMap();
        Map<String, DataType[]> tableDataTypesMap = Maps.newConcurrentMap();
        Map<String, RowType> tableRowTypeMap = Maps.newConcurrentMap();
        for (String table : tables) {
            // 獲取mysql catalog中注冊的表
            ObjectPath objectPath = new ObjectPath(db, table);
            DefaultCatalogTable catalogBaseTable = (DefaultCatalogTable) mysqlCatalog.getTable(objectPath);
            // 獲取表的Schema
            Schema schema = catalogBaseTable.getUnresolvedSchema();
            // 獲取表中字段名列表
            String[] fieldNames = new String[schema.getColumns().size()];
            // 獲取DataType
            DataType[] fieldDataTypes = new DataType[schema.getColumns().size()];
            LogicalType[] logicalTypes = new LogicalType[schema.getColumns().size()];
            // 獲取表字段類型
            TypeInformation<?>[] fieldTypes = new TypeInformation[schema.getColumns().size()];
            // 獲取表的主鍵
            List<String> primaryKeys = schema.getPrimaryKey().get().getColumnNames();

            for (int i = 0; i < schema.getColumns().size(); i++) {
                Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
                fieldNames[i] = column.getName();
                fieldDataTypes[i] = (DataType) column.getDataType();
                fieldTypes[i] = InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType());
                logicalTypes[i] = ((DataType) column.getDataType()).getLogicalType();
            }
            RowType rowType = RowType.of(logicalTypes, fieldNames);
            tableRowTypeMap.put(table, rowType);

            // 組裝sink表ddl sql
            StringBuilder stmt = new StringBuilder();
            String tableName = table;
            String jdbcSinkTableName = String.format("jdbc_sink_%s", tableName);
            stmt.append("create table ").append(jdbcSinkTableName).append("(\n");

            for (int i = 0; i < fieldNames.length; i++) {
                String column = fieldNames[i];
                String fieldDataType = fieldDataTypes[i].toString();
                stmt.append("\t").append(column).append(" ").append(fieldDataType).append(",\n");
            }
            stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCED\n)", StringUtils.join(primaryKeys, ",")));
            String formatJdbcSinkWithBody = connectorWithBody
                    .replace("${tableName}", jdbcSinkTableName);
            String createSinkTableDdl = stmt.toString() + formatJdbcSinkWithBody;
            // 創(chuàng)建sink表
            log.info("createSinkTableDdl: {}", createSinkTableDdl);
            tEnv.executeSql(createSinkTableDdl);
            tableDataTypesMap.put(tableName, fieldDataTypes);
            tableTypeInformationMap.put(tableName, new RowTypeInfo(fieldTypes, fieldNames));
        }

       // 監(jiān)控mysql binlog
        MySqlSource mySqlSource = MySqlSource.<Tuple2<String, Row>>builder()
                .hostname(host)
                .port(port)
                .databaseList(db)
                .tableList(tableList)
                .username(userName)
                .password(passWord)
                .deserializer(new CustomDebeziumDeserializer(tableRowTypeMap))
                .startupOptions(StartupOptions.initial())
                .build();
        SingleOutputStreamOperator<Tuple2<String, Row>> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql cdc").disableChaining();
        StatementSet statementSet = tEnv.createStatementSet();
        // dataStream轉(zhuǎn)Table,創(chuàng)建臨時視圖,插入sink表
        for (Map.Entry<String, RowTypeInfo> entry : tableTypeInformationMap.entrySet()) {
            String tableName = entry.getKey();
            RowTypeInfo rowTypeInfo = entry.getValue();
            SingleOutputStreamOperator<Row> mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo);
            Table table = tEnv.fromChangelogStream(mapStream);
            String temporaryViewName = String.format("t_%s", tableName);
            tEnv.createTemporaryView(temporaryViewName, table);
            String sinkTableName = String.format("jdbc_sink_%s", tableName);
            String insertSql = String.format("insert into %s select * from %s", sinkTableName, temporaryViewName);
            log.info("add insertSql for {},sql: {}", tableName, insertSql);
            statementSet.addInsertSql(insertSql);
        }
        statementSet.execute();
    }
}

對應(yīng)的反序列化代碼

package com.cityos;


import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map;


public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema {


    /**
     * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
     * physical column values.
     */

    private final Map<String, RowType> tableRowTypeMap;
    private Map<String, DeserializationRuntimeConverter> physicalConverterMap = Maps.newConcurrentMap();

    CustomDebeziumDeserializer(Map tableRowTypeMap) {
        this.tableRowTypeMap = tableRowTypeMap;
        for (String tablename : this.tableRowTypeMap.keySet()) {
            RowType rowType = this.tableRowTypeMap.get(tablename);
            DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType);
            this.physicalConverterMap.put(tablename,physicalConverter);
        }
    }

    @Override
    public void deserialize(SourceRecord record, Collector out) throws Exception {
        Envelope.Operation op = Envelope.operationFor(record);
        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();
        Struct source = value.getStruct("source");
        String tablename = source.get("table").toString();
        DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename);
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            Row insert = extractAfterRow(value, valueSchema, physicalConverter);
            insert.setKind(RowKind.INSERT);
            out.collect(Tuple2.of(tablename,insert));
        } else if (op == Envelope.Operation.DELETE) {
            Row delete = extractBeforeRow(value, valueSchema, physicalConverter);
            delete.setKind(RowKind.DELETE);
            out.collect(Tuple2.of(tablename,delete));
        } else {
            Row before = extractBeforeRow(value, valueSchema, physicalConverter);
            before.setKind(RowKind.UPDATE_BEFORE);
            out.collect(Tuple2.of(tablename,before));

            Row after = extractAfterRow(value, valueSchema, physicalConverter);
            after.setKind(RowKind.UPDATE_AFTER);
            out.collect(Tuple2.of(tablename,after));
        }
    }

    private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {
        Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
        Struct after = value.getStruct(Envelope.FieldName.AFTER);
        return (Row) physicalConverter.convert(after, afterSchema);
    }

    private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter) throws Exception {
        Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
        Struct before = value.getStruct(Envelope.FieldName.BEFORE);
        return (Row) physicalConverter.convert(before, beforeSchema);
    }


    @Override
    public TypeInformation<Tuple2<String, Row>> getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {
        });
    }

    public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {


        switch (type.getTypeRoot()) {
            case NULL:
                return new DeserializationRuntimeConverter() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return null;
                    }
                };
            case BOOLEAN:
                return convertToBoolean();
            case TINYINT:
                return new DeserializationRuntimeConverter() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return Byte.parseByte(dbzObj.toString());
                    }
                };
            case SMALLINT:
                return new DeserializationRuntimeConverter() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object convert(Object dbzObj, Schema schema) {
                        return Short.parseShort(dbzObj.toString());
                    }
                };
            case INTEGER:
            case INTERVAL_YEAR_MONTH:
                return convertToInt();
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return convertToLong();
            case DATE:
                return convertToDate();
            case TIME_WITHOUT_TIME_ZONE:
                return convertToTime();
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return convertToTimestamp(ZoneId.of("UTC"));
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
            case FLOAT:
                return convertToFloat();
            case DOUBLE:
                return convertToDouble();
            case CHAR:
            case VARCHAR:
                return convertToString();
            case BINARY:
            case VARBINARY:
                return convertToBinary();
            case DECIMAL:
                return createDecimalConverter((DecimalType) type);
            case ROW:
                return createRowConverter(
                        (RowType) type);
            case ARRAY:
            case MAP:
            case MULTISET:
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type: " + type);
        }
    }

    private static DeserializationRuntimeConverter convertToBoolean() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Boolean) {
                    return dbzObj;
                } else if (dbzObj instanceof Byte) {
                    return (byte) dbzObj == 1;
                } else if (dbzObj instanceof Short) {
                    return (short) dbzObj == 1;
                } else {
                    return Boolean.parseBoolean(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToInt() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return dbzObj;
                } else if (dbzObj instanceof Long) {
                    return ((Long) dbzObj).intValue();
                } else {
                    return Integer.parseInt(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToLong() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Integer) {
                    return ((Integer) dbzObj).longValue();
                } else if (dbzObj instanceof Long) {
                    return dbzObj;
                } else {
                    return Long.parseLong(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
        final int precision = decimalType.getPrecision();
        final int scale = decimalType.getScale();
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                BigDecimal bigDecimal;
                if (dbzObj instanceof byte[]) {
                    // decimal.handling.mode=precise
                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
                } else if (dbzObj instanceof String) {
                    // decimal.handling.mode=string
                    bigDecimal = new BigDecimal((String) dbzObj);
                } else if (dbzObj instanceof Double) {
                    // decimal.handling.mode=double
                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
                } else {
                    if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
                        SpecialValueDecimal decimal =
                                VariableScaleDecimal.toLogical((Struct) dbzObj);
                        bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
                    } else {
                        // fallback to string
                        bigDecimal = new BigDecimal(dbzObj.toString());
                    }
                }
                return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
            }
        };
    }

    private static DeserializationRuntimeConverter convertToDouble() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return ((Float) dbzObj).doubleValue();
                } else if (dbzObj instanceof Double) {
                    return dbzObj;
                } else {
                    return Double.parseDouble(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToFloat() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Float) {
                    return dbzObj;
                } else if (dbzObj instanceof Double) {
                    return ((Double) dbzObj).floatValue();
                } else {
                    return Float.parseFloat(dbzObj.toString());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter convertToDate() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
            }
        };
    }

    private static DeserializationRuntimeConverter convertToTime() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    switch (schema.name()) {
                        case MicroTime.SCHEMA_NAME:
                            return (int) ((long) dbzObj / 1000);
                        case NanoTime.SCHEMA_NAME:
                            return (int) ((long) dbzObj / 1000_000);
                    }
                } else if (dbzObj instanceof Integer) {
                    return dbzObj;
                }
                // get number of milliseconds of the day
                return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
            }
        };
    }

    private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof Long) {
                    switch (schema.name()) {
                        case Timestamp.SCHEMA_NAME:
                            return TimestampData.fromEpochMillis((Long) dbzObj);
                        case MicroTimestamp.SCHEMA_NAME:
                            long micro = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    micro / 1000, (int) (micro % 1000 * 1000));
                        case NanoTimestamp.SCHEMA_NAME:
                            long nano = (long) dbzObj;
                            return TimestampData.fromEpochMillis(
                                    nano / 1000_000, (int) (nano % 1000_000));
                    }
                }
                LocalDateTime localDateTime =
                        TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
                return TimestampData.fromLocalDateTime(localDateTime);
            }
        };
    }

    private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
            ZoneId serverTimeZone) {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof String) {
                    String str = (String) dbzObj;
                    // TIMESTAMP_LTZ type is encoded in string type
                    Instant instant = Instant.parse(str);
                    return TimestampData.fromLocalDateTime(
                            LocalDateTime.ofInstant(instant, serverTimeZone));
                }
                throw new IllegalArgumentException(
                        "Unable to convert to TimestampData from unexpected value '"
                                + dbzObj
                                + "' of type "
                                + dbzObj.getClass().getName());
            }
        };
    }

    private static DeserializationRuntimeConverter convertToString() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                return StringData.fromString(dbzObj.toString());
            }
        };
    }

    private static DeserializationRuntimeConverter convertToBinary() {
        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) {
                if (dbzObj instanceof byte[]) {
                    return dbzObj;
                } else if (dbzObj instanceof ByteBuffer) {
                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    return bytes;
                } else {
                    throw new UnsupportedOperationException(
                            "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
                }
            }
        };
    }

    private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
        final DeserializationRuntimeConverter[] fieldConverters =
                rowType.getFields().stream()
                        .map(RowType.RowField::getType)
                        .map(
                                logicType ->
                                        createNotNullConverter( logicType))
                        .toArray(DeserializationRuntimeConverter[]::new);
        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);

        return new DeserializationRuntimeConverter() {

            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Object dbzObj, Schema schema) throws Exception {
                Struct struct = (Struct) dbzObj;
                int arity = fieldNames.length;
                Row row = new Row(arity);
                for (int i = 0; i < arity; i++) {
                    String fieldName = fieldNames[i];
                    Field field = schema.field(fieldName);
                    if (field == null) {
                        row.setField(i, null);
                    } else {
                        Object fieldValue = struct.getWithoutDefault(fieldName);
                        Schema fieldSchema = schema.field(fieldName).schema();
                        Object convertedField =
                                convertField(fieldConverters[i], fieldValue, fieldSchema);
                        row.setField(i, convertedField);
                    }
                }
                return row;
            }
        };
    }

    private static Object convertField(
            DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
            throws Exception {
        if (fieldValue == null) {
            return null;
        } else {
            return fieldConverter.convert(fieldValue, fieldSchema);
        }
    }
}

再貼上我的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cityos</groupId>
    <artifactId>flink_1_15</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
        <flink.version>1.15.2</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <!--        <scala.version>2.12.12</scala.version>-->
    </properties>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>

        <repository>
            <id>spring</id>
            <url>https://maven.aliyun.com/repository/spring</url>
        </repository>

        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>


    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--        mysql-cdc-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <!--        mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
            <scope>compile</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.7.RELEASE</version>
                <configuration>
                    <mainClass>com.cityos.Flink1142Application</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

有興趣的看看,沒興趣的或者感覺不屑的劃過就好,莫噴我,代碼寫的確實是丑。文章來源地址http://www.zghlxwxcb.cn/news/detail-505113.html

到了這里,關(guān)于Flink CDC整庫同步(多表異構(gòu)同步)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • flink cdc MySQL2Doris 案例分享 解決分庫多表同步

    使用flink cdc,完成mysql 多庫 多表同時同步到doris中 flink 1.14.4 doris 1.1.0 flink-connector-mysql-cdc 2.2.1版本 一直會報異常 java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder 從官網(wǎng)下載依賴,然后本地添加進去flink-sql-connector-mysql-cdc-2.2.0 由于 U

    2023年04月09日
    瀏覽(19)
  • flinkcdc 3.0 源碼學(xué)習(xí)之任務(wù)提交腳本flink-cdc.sh

    flinkcdc 3.0 源碼學(xué)習(xí)之任務(wù)提交腳本flink-cdc.sh

    大道至簡,用簡單的話來描述復(fù)雜的事,我是Antgeek,歡迎閱讀. 在flink 3.0版本中,我們僅通過一個簡單yaml文件就可以配置出一個復(fù)雜的數(shù)據(jù)同步任務(wù), 然后再來一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以將任務(wù)提交, 本文就是來探索一下這個shell腳本,主要是研究如何通過一個shell命

    2024年02月19日
    瀏覽(22)
  • FLink多表關(guān)聯(lián)實時同步

    FLink多表關(guān)聯(lián)實時同步

    Oracle-Debezium-Kafka-Flink-PostgreSQL Flink消費Kafka中客戶、產(chǎn)品、訂單(ID)三張表的數(shù)據(jù)合并為一張訂單(NAME)表。 Oracle內(nèi)創(chuàng)建三張表 PostgreSQL內(nèi)創(chuàng)建一張表 其他前置環(huán)境 Oracle、PostgreSQL、Kafka、FLink、Debezium-Server的部署參見本系列其他文章搭建。 采用前置條件中的語句建表即可,

    2023年04月25日
    瀏覽(15)
  • Flink CDC數(shù)據(jù)同步

    Flink CDC數(shù)據(jù)同步

    一、什么是FLink Apache?Flink?是一個框架和分布式處理引擎,用于在無邊界和有邊界數(shù)據(jù)流上進行有狀態(tài)的計算。Flink?能在所有常見集群環(huán)境中運行,并能以內(nèi)存速度和任意規(guī)模進行計算。 接下來,我們來介紹一下?Flink?架構(gòu)中的重要方面。 任何類型的數(shù)據(jù)都可以形成一種事

    2024年02月08日
    瀏覽(20)
  • FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

    FlinkCDC第三部分-同步mysql到mysql,ctrl就完事~(flink版本1.16.2)

    本文介紹了? 來源單表-目標(biāo)源單表同步,多來源單表-目標(biāo)源單表同步。 注:1.16版本、1.17版本都可以使用火焰圖,生產(chǎn)上最好關(guān)閉,詳情見文章末尾 Flink版本:1.16.2 環(huán)境:Linux CentOS 7.0、jdk1.8 基礎(chǔ)文件: flink-1.16.2-bin-scala_2.12.tgz、 flink-connector-jdbc-3.0.0-1.16.jar、(maven倉庫目錄:

    2024年02月11日
    瀏覽(22)
  • 基于 Flink CDC 的實時同步系統(tǒng)

    基于 Flink CDC 的實時同步系統(tǒng)

    摘要: 本文整理自科杰科技大數(shù)據(jù)架構(gòu)師張軍,在 FFA 2022 數(shù)據(jù)集成專場的分享。本篇內(nèi)容主要分為四個部分: 功能概述 架構(gòu)設(shè)計 技術(shù)挑戰(zhàn) 生產(chǎn)實踐 Tips: 點擊 「閱讀原文」 查看原文視頻演講 ppt 科杰科技是專門做大數(shù)據(jù)服務(wù)的供應(yīng)商,目前的客戶包括能源、金融、證券等

    2024年02月05日
    瀏覽(31)
  • flink oracle cdc實時同步(超詳細(xì))

    flink oracle cdc實時同步(超詳細(xì))

    官方文檔:https://github.com/ververica/flink-cdc-connectors/blob/release-master/docs/content/connectors/oracle-cdc.md 本文參照官方文檔來記錄Oracle CDC 的配置。 在本文開始前,需要先安裝Oracle,有興趣的同學(xué)可以參考博主之前寫的《docker下安裝oracle11g(一次安裝成功)》。 如果要做oracle的實時同步

    2024年02月12日
    瀏覽(21)
  • 【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    【實戰(zhàn)-01】flink cdc 實時數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對很多初入門的人來說是無法理解cdc到底是什么個東西。 有這樣一個需求,比如在mysql數(shù)據(jù)庫中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉庫(starrocks), 數(shù)據(jù)倉庫你可以理解為存儲了各種各樣來自不同數(shù)據(jù)庫中表。 數(shù)據(jù)的同步目前對

    2023年04月08日
    瀏覽(94)
  • Flink 實現(xiàn) MySQL CDC 動態(tài)同步表結(jié)構(gòu)

    Flink 實現(xiàn) MySQL CDC 動態(tài)同步表結(jié)構(gòu)

    作者:陳少龍,騰訊 CSIG 高級工程師 使用 Flink CDC(Change Data Capture) 實現(xiàn)數(shù)據(jù)同步被越來越多的人接受。本文介紹了在數(shù)據(jù)同步過程中,如何將 Schema 的變化實時地從 MySQL 中同步到 Flink 程序中去。 MySQL 存儲的數(shù)據(jù)量大了之后往往會出現(xiàn)查詢性能下降的問題,這時候通過 Flin

    2024年02月04日
    瀏覽(23)
  • Flink CDC MySQL同步MySQL錯誤記錄

    Flink CDC MySQL同步MySQL錯誤記錄

    0、相關(guān)Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者從mvnrepository.com下載 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包