国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink cdc同步mysql到starrocks(日期時(shí)間格式/時(shí)區(qū)處理)

這篇具有很好參考價(jià)值的文章主要介紹了Flink cdc同步mysql到starrocks(日期時(shí)間格式/時(shí)區(qū)處理)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

環(huán)境

flink 1.15.3(此時(shí)最新版本為1.16.1)
mysql 5.7+
starrocks 2.5.2

mysql同步表結(jié)構(gòu)

mysql中的timestamp字段是可以正常同步的,但是多了8小時(shí),設(shè)置了mysql鏈接屬性也沒(méi)效果

CREATE TABLE `temp_flink` (
  `id` int(11) NOT NULL,
  `name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `remark` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `create_date` datetime DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

參考下方的鏈接有兩種方式;

這里使用單獨(dú)的轉(zhuǎn)換器代碼如下


import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
 * mysql日期字段時(shí)區(qū)/格式處理
 * @author JGMa
 */
public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;

    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;

    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private ZoneId timestampZoneId = ZoneId.systemDefault();

    @Override
    public void configure(Properties props) {

    }

    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {

        String sqlType = column.typeName().toUpperCase();

        SchemaBuilder schemaBuilder = null;

        Converter converter = null;

        if ("DATE".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");

            converter = this::convertDate;

        }

        if ("TIME".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");

            converter = this::convertTime;

        }

        if ("DATETIME".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");

            converter = this::convertDateTime;


        }

        if ("TIMESTAMP".equals(sqlType)) {

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");

            converter = this::convertTimestamp;

        }

        if (schemaBuilder != null) {

            registration.register(schemaBuilder, converter);

        }

    }


    private String convertDate(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof LocalDate) {

            return dateFormatter.format((LocalDate) input);

        }

        if (input instanceof Integer) {

            LocalDate date = LocalDate.ofEpochDay((Integer) input);

            return dateFormatter.format(date);

        }

        return String.valueOf(input);

    }


    private String convertTime(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof Duration) {

            Duration duration = (Duration) input;

            long seconds = duration.getSeconds();

            int nano = duration.getNano();

            LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);

            return timeFormatter.format(time);

        }

        return String.valueOf(input);

    }


    private String convertDateTime(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof LocalDateTime) {

            return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");

        }

        return String.valueOf(input);

    }


    private String convertTimestamp(Object input) {

        if (input == null) {
            return null;
        }

        if (input instanceof ZonedDateTime) {

            // mysql的timestamp會(huì)轉(zhuǎn)成UTC存儲(chǔ),這里的zonedDatetime都是UTC時(shí)間

            ZonedDateTime zonedDateTime = (ZonedDateTime) input;

            LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();

            return timestampFormatter.format(localDateTime).replaceAll("T", " ");

        }
        return String.valueOf(input);
    }
}

使用

{

    public static void main(String[] args) {
        String tableName = "temp_flink";
        String srcHost = "192.168.10.14";
        String srcDatabase = "xcode";
        String srcUsername = "root";
        String srcPassword = "123456";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        Properties mysqlProperties = new Properties();
//        mysqlProperties.setProperty("characterEncoding","UTF-8");
//        mysqlProperties.setProperty("connectionTimeZone","Asia/Shanghai");
        //自定義時(shí)間轉(zhuǎn)換配置
        mysqlProperties.setProperty("converters", "dateConverters");
        mysqlProperties.setProperty("dateConverters.type", "com.txlc.flink.core.MySqlDateTimeConverter");

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(srcHost)
//                .jdbcProperties(mysqlProperties)
                .port(3306)
                .databaseList(srcDatabase)
                .tableList(srcDatabase + "." + tableName)
                .username(srcUsername)
                .password(srcPassword)
//                .serverTimeZone("Asia/Shanghai")
// 主要是這里
                .debeziumProperties(mysqlProperties)
                .deserializer(new JsonStringDebeziumDeserializationSchema())
                .build();

        DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[temp_flink-source]")
                .setParallelism(1);

        streamSource.addSink(StarRocksSink.sink(
                // the sink options
                StarRocksSinkOptions.builder()
                        .withProperty("jdbc-url", "jdbc:mysql://192.168.10.245:9030?characterEncoding=utf-8")
                        .withProperty("load-url", "192.168.10.245:8030")
                        .withProperty("database-name", "xcode")
                        .withProperty("username", "root")
                        .withProperty("password", "123456")
                        .withProperty("table-name", tableName)
                        // 自 2.4 版本,支持更新主鍵模型中的部分列。您可以通過(guò)以下兩個(gè)屬性指定需要更新的列。
                        // .withProperty("sink.properties.partial_update", "true")
                        // .withProperty("sink.properties.columns", "k1,k2,k3")
                        .withProperty("sink.properties.format", "json")
                        .withProperty("sink.properties.strip_outer_array", "true")
                        // 設(shè)置并行度,多并行度情況下需要考慮如何保證數(shù)據(jù)有序性
                        .withProperty("sink.parallelism", "1")
                        .build())
        ).name(">>>StarRocks temp_flink Sink<<<");

        try {
            env.execute("temp_flink stream sync");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("[sync error] info : {}", e);
        }


    }

}

參考資料
https://blog.csdn.net/cloudbigdata/article/details/122935333
https://blog.csdn.net/WuBoooo/article/details/127387144文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-596301.html

到了這里,關(guān)于Flink cdc同步mysql到starrocks(日期時(shí)間格式/時(shí)區(qū)處理)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Mysql+ETLCloud CDC+StarRocks實(shí)時(shí)數(shù)倉(cāng)同步實(shí)戰(zhàn)

    Mysql+ETLCloud CDC+StarRocks實(shí)時(shí)數(shù)倉(cāng)同步實(shí)戰(zhàn)

    大型企業(yè)需要對(duì)各種業(yè)務(wù)系統(tǒng)中的銷售及營(yíng)銷數(shù)據(jù)進(jìn)行實(shí)時(shí)同步分析,例如庫(kù)存信息、對(duì)帳信號(hào)、會(huì)員信息、廣告投放信息,生產(chǎn)進(jìn)度信息等等,這些統(tǒng)計(jì)分析信息可以實(shí)時(shí)同步到StarRocks中進(jìn)行分析和統(tǒng)計(jì),StarRocks作為分析型數(shù)據(jù)庫(kù)特別適合于對(duì)海量數(shù)據(jù)的存儲(chǔ)和分析,我們

    2024年02月16日
    瀏覽(22)
  • springboot集成starrocks、以及采用flink實(shí)現(xiàn)mysql與starrocks亞秒級(jí)同步

    (因采用dynamic-datasource-spring-boot-starter動(dòng)態(tài)數(shù)據(jù)源,所以才是以下配置文件的樣式,像redis,druid根據(jù)自己情況導(dǎo)入依賴) 這個(gè)配置文件的場(chǎng)景是把starrocks當(dāng)成slave庫(kù)在用。某些大數(shù)據(jù)慢查詢就走starrocks 就這樣配置好后就可把starrocks當(dāng)mysql用了 重點(diǎn):采用這種方式有限制,插入

    2024年01月21日
    瀏覽(17)
  • flink- mysql同步數(shù)據(jù)至starrocks-2.5.0之環(huán)境搭建

    一般需要以下幾個(gè)服務(wù): mysql flink flink-taskmanager flink-jobmanager starrocks starrocks-fe starrocks-be docker-compose.yml 配置文件 啟動(dòng): docker-compose up -d : 登陸 starrocks 注意事項(xiàng) mysql開(kāi)啟 bin log mysql 基于cdc ,采用 binlog模式,所以要開(kāi)啟binlog, conf/my.cnf : docker 服務(wù)放到同一docker網(wǎng)絡(luò)中 如果不在同

    2024年02月10日
    瀏覽(88)
  • Flink CDC MySQL同步MySQL錯(cuò)誤記錄

    Flink CDC MySQL同步MySQL錯(cuò)誤記錄

    0、相關(guān)Jar包 https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/ 或者從mvnrepository.com下載 https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc https://mvnrepository.com/artifact/org.apache.flink/flink-connector-

    2024年02月03日
    瀏覽(21)
  • 最新版Flink CDC MySQL同步MySQL(一)

    最新版Flink CDC MySQL同步MySQL(一)

    Flink CDC 是Apache Flink ?的一組源連接器,使用變更數(shù)據(jù)捕獲 (CDC) 從不同數(shù)據(jù)庫(kù)中獲取變更。Apache Flink 的 CDC Connectors集成 Debezium 作為捕獲數(shù)據(jù)更改的引擎。所以它可以充分發(fā)揮 Debezium 的能力。 連接器 數(shù)據(jù)庫(kù) 驅(qū)動(dòng) mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.4 mysql-cdc MySQL: 5.6, 5.

    2024年02月13日
    瀏覽(17)
  • 基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

    基于Flink CDC實(shí)時(shí)同步數(shù)據(jù)(MySQL到MySQL)

    jdk8 Flink 1.16.1(部署在遠(yuǎn)程服務(wù)器:192.168.137.99) Flink CDC 2.3.0 MySQL 8.0(安裝在本地:192.168.3.31) (安裝部署過(guò)程略) 準(zhǔn)備三個(gè)數(shù)據(jù)庫(kù):flink_source、flink_sink、flink_sink_second。 將flink_source.source_test表實(shí)時(shí)同步到flink_sink和flink_sink_second的sink_test表。 (建庫(kù)建表過(guò)程略) 開(kāi)發(fā)過(guò)程

    2024年02月06日
    瀏覽(27)
  • Flink 實(shí)現(xiàn) MySQL CDC 動(dòng)態(tài)同步表結(jié)構(gòu)

    Flink 實(shí)現(xiàn) MySQL CDC 動(dòng)態(tài)同步表結(jié)構(gòu)

    作者:陳少龍,騰訊 CSIG 高級(jí)工程師 使用 Flink CDC(Change Data Capture) 實(shí)現(xiàn)數(shù)據(jù)同步被越來(lái)越多的人接受。本文介紹了在數(shù)據(jù)同步過(guò)程中,如何將 Schema 的變化實(shí)時(shí)地從 MySQL 中同步到 Flink 程序中去。 MySQL 存儲(chǔ)的數(shù)據(jù)量大了之后往往會(huì)出現(xiàn)查詢性能下降的問(wèn)題,這時(shí)候通過(guò) Flin

    2024年02月04日
    瀏覽(23)
  • Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表

    Flink CDC 基于mysql binlog 實(shí)時(shí)同步mysql表

    環(huán)境說(shuō)明: flink?1.15.2 mysql 版本5.7? ? 注意:需要開(kāi)啟binlog,因?yàn)樵隽客绞腔赽inlog捕獲數(shù)據(jù) windows11 IDEA 本地運(yùn)行 先上官網(wǎng)使用說(shuō)明和案例:MySQL CDC Connector — Flink CDC documentation 1. mysql開(kāi)啟binlog (注意,引擎是 InnoDB,如果是ndbcluster,本人測(cè)試是捕獲不到binlog日志的,增量相

    2024年02月10日
    瀏覽(24)
  • 最新版Flink CDC MySQL同步MySQL(一)_flink 連接mysql(1)

    最新版Flink CDC MySQL同步MySQL(一)_flink 連接mysql(1)

    下載 連接器 SQL jar (或 自行構(gòu)建 )。 將下載的jar包放在FLINK_HOME/lib/. 重啟Flink集群。 注意 :目前2.4以上版本需要進(jìn)行自行編譯構(gòu)建。本文筆者自行進(jìn)行構(gòu)建上傳的 6.使用 Flink CDC 對(duì) MySQL 進(jìn)行流式 ETL 本教程將展示如何使用 Flink CDC 快速構(gòu)建 MySQL的流式 ETL。 假設(shè)我們將產(chǎn)品數(shù)

    2024年04月26日
    瀏覽(23)
  • 基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    基于Flink SQL CDC Mysql to Mysql數(shù)據(jù)同步

    Flink CDC有兩種方式同步數(shù)據(jù)庫(kù): 一種是通過(guò)FlinkSQL直接輸入兩表數(shù)據(jù)庫(kù)映射進(jìn)行數(shù)據(jù)同步,缺點(diǎn)是只能單表進(jìn)行同步; 一種是通過(guò)DataStream開(kāi)發(fā)一個(gè)maven項(xiàng)目,打成jar包上傳到服務(wù)器運(yùn)行。 本方案使用FlinkSQL方法,同步兩表中的數(shù)據(jù)。 其中Flink應(yīng)用可以部署在具有公網(wǎng)IP的服務(wù)

    2023年04月11日
    瀏覽(27)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包