国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink動態(tài)ClickhouseSink+自動建表

這篇具有很好參考價值的文章主要介紹了Flink動態(tài)ClickhouseSink+自動建表。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

通過自定義注解的形式,對JdbcSink進行封裝,支持自動建表、自動拼接insert語句文章來源地址http://www.zghlxwxcb.cn/news/detail-536734.html

主類

package flink.security.sink;


import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 通用clickhouse sink
 */
@Slf4j
public class SecurityClickHouseSink {
    public static <T> SinkFunction<T> getSink(Class<T> model) throws Exception {
        createTable(model);
        String sql = createSql(model);
        return JdbcSink.sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @SneakyThrows
                    @Override
                    public void accept(PreparedStatement preparedStatement, T t) throws SQLException {
                        Class<?> obj = t.getClass();
                        Field[] declaredFields = obj.getDeclaredFields();
                        Stream<Field> usedFields = getUsedFields(declaredFields);
                        List<Field> fields = usedFields.collect(Collectors.toList());
                        for (int i = 0; i < fields.size(); i++) {
                            fields.get(i).setAccessible(true);
                            Object o = fields.get(i).get(t);
                            if (o instanceof Boolean) {
                                preparedStatement.setInt(i + 1, (Boolean) o ? 1 : 0);
                            } else if (o instanceof List) {
                                preparedStatement.setString(i + 1, JSONUtil.toJsonStr(o));
                            } else if (o instanceof Date) {
                                preparedStatement.setString(i + 1, DateFormatUtil.toYmdHms(((Date) o).getTime()));
                            } else {
                                preparedStatement.setObject(i + 1, o);
                            }
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(10000)
                        .withBatchIntervalMs(1000L * 10)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName(Constant.CLICKHOUSE_DRIVER_NAME)
                        .withUrl(Constant.CLICKHOUSE_URL)
                        .build()
        );
    }

    /**
     * 組裝sql
     */
    private static <T> String createSql(Class<T> model) {
        // 1、獲取表名
        SecurityTable tableAnno = model.getAnnotation(SecurityTable.class);
        if (tableAnno == null) {
            throw new RuntimeException("缺少SecurityTable注解");
        }
        String tableName = tableAnno.value().trim();
        // 2、獲取字段名
        Field[] declaredFields = model.getDeclaredFields();
        // 2.1 過濾掉帶SecurityField注解且isTableFile=false的字段
        Stream<Field> fields = getUsedFields(declaredFields);
        // 3.2 獲取字段
        List<String> fieldnames = fields.map(SecurityClickHouseSink::getTableFiledName).collect(Collectors.toList());
        // 3、獲取占位符
        List<String> placeholder = fieldnames.stream().map(f -> "?").collect(Collectors.toList());

        if (ObjectUtil.isEmpty(placeholder) || ObjectUtil.isEmpty(fieldnames)) {
            throw new RuntimeException("缺少必要的表字段!");
        }
        // 分布式表需要插入視圖
        String distributed = tableAnno.distributed();
        return "insert into " + tableName + (StringUtils.isBlank(distributed) ? "" : "_mr") + "(" + StringUtils.join(fieldnames, ",") + ") values(" + StringUtils.join(placeholder, ",") + ")";
    }

    /**
     * 組裝sql
     */
    private static <T> void createTable(Class<T> model) throws Exception {
        // 1、獲取表名
        SecurityTable tableAnno = model.getAnnotation(SecurityTable.class);
        if (tableAnno == null) {
            throw new RuntimeException("缺少SecurityTable注解");
        }
        String tableName = tableAnno.value().trim();
        String engine = tableAnno.engine().trim();
        String distributed = tableAnno.distributed().trim();
        String cluster = tableAnno.cluster().trim();
        // 2、獲取字段名
        Field[] declaredFields = model.getDeclaredFields();
        // 2.1 過濾掉帶SecurityField注解且isTableFile=false的字段
        Stream<Field> fields = getUsedFields(declaredFields);
        // 3.2 獲取字段
        List<String> fieldnames = fields.map(f -> {
            String wordType = f.getType().getName();
            SecurityField annotation = f.getAnnotation(SecurityField.class);
            switch (wordType) {
                case "java.lang.String":
                    wordType = "String";
                    break;
                case "java.util.Date":
                case "java.time.LocalDate":
                case "java.time.LocalDateTime":
                    wordType = "Datetime";
                    break;
                case "java.util.List":
                    wordType = "String";
                    break;
                case "boolean":
                    wordType = "UInt8";
                    break;
                case "java.lang.Double":
                    wordType = "Decimal(15,3)";
                    break;
                case "java.lang.Integer":
                    wordType = "Int32";
                    break;
                case "java.lang.Long":
                    wordType = "Int64";
                    break;
                default:
                    wordType = "String";
            }
            if (annotation == null) {
                return f.getName() + " " + wordType + " comment ''";
            } else {
                String comment = Optional.of(annotation.comment()).orElse("").replace("'", "`").replace("'", "`");
                return (StringUtils.isBlank(annotation.value()) ? f.getName() : annotation.value()) + " " + wordType + " comment '" + comment + "'";
            }
        }).collect(Collectors.toList());

        if (ObjectUtil.isEmpty(fieldnames)) {
            throw new RuntimeException("缺少必要的表字段!");
        }

        String cols = StringUtils.join(fieldnames, ",\n");

        try(Connection connection = DriverManager.getConnection(Constant.CLICKHOUSE_URL, Constant.CLICKHOUSE_USERNAME, Constant.CLICKHOUSE_PASSWORD);) {
            String localtable = String.format("create table if not exists  %s %s (%s) engine = %s", tableName, cluster, cols, engine);
            log.error(localtable);
            try (PreparedStatement preparedStatement = connection.prepareStatement(localtable)) {
                preparedStatement.executeUpdate();
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("建表失敗請查看建表語句:\r\n" + localtable);
            }
            if (StringUtils.isNotBlank(distributed)) {
//                localtable = "CREATE TABLE default.PowerConsumption ON CLUSTER default_cluster AS default.PowerConsumption_local\n" +
//                        "ENGINE = Distributed(default_cluster, default, PowerConsumption_local, rand());";
                localtable = String.format("create table if not exists  %s_mr  %s AS %s engine = %s", tableName, cluster, tableName, distributed);
                log.error(localtable);
                try (PreparedStatement preparedStatement = connection.prepareStatement(localtable)) {
                    preparedStatement.executeUpdate();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new RuntimeException("建表失敗請查看建表語句:\r\n" + localtable);
                }
            }
        }
    }

    private static String getTableFiledName(Field field) {
        SecurityField annotation = field.getAnnotation(SecurityField.class);
        if (annotation == null) {
            return field.getName();
        } else {
            String value = annotation.value();
            if (StringUtils.isBlank(value)) {
                return field.getName();
            }
            return value;
        }
    }

    private static Stream<Field> getUsedFields(Field[] declaredFields) {
        return Arrays.stream(declaredFields).filter(field -> {
            SecurityField annotation = field.getAnnotation(SecurityField.class);
            return annotation == null || annotation.isTableField();
        });
    }

}

?自定義注解

package cn.chinaunicom.sdsi.flink.security.anno;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SecurityField {
    String value() default "";
    boolean isTableField() default true;
    String comment() default "''";
}
package cn.chinaunicom.sdsi.flink.security.anno;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SecurityTable {
    String value();
    String engine() default "";
    String distributed() default "";
    String cluster() default "";
}

使用方式

SingleOutputStreamOperator<SecurityAlarms> process = attack.connect(broadcastStream).process(new BroadcastProcessFunction<SecurityAlarms, Co...

SinkFunction<SecurityAlarms> sink = SecurityClickHouseSink.getSink(SecurityAlarms.class);
process.addSink(sink);

Bean的定義

@Data
@SecurityTable(value = "security_data_center.security_alarms",
        cluster = "on cluster cluster_p2_r2_2345",
        engine = "ReplicatedReplacingMergeTree partition by (toYYYYMMDD(collectorReceiptTime)) order by (collectorReceiptTime,eventId)",
        distributed = "Distributed(cluster_p2_r2_2345,security_data_center,security_alarms,rand())")
public class SecurityAlarms extends CommonFileds{

    @SecurityField(comment = "數(shù)據(jù)接收來源【數(shù)據(jù)接收來源(安恒云)】")
    private String dispatchSource;
    @SecurityField(comment = "活動目錄唯一標(biāo)識【活動目錄唯一標(biāo)識】")
    private String DN;
    @SecurityField(comment = "DPI告警類型【DPI數(shù)據(jù)告警類型】")
    private String DPIAlertType;
}

到了這里,關(guān)于Flink動態(tài)ClickhouseSink+自動建表的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 用flink cdc sqlserver 將數(shù)據(jù)實時同步到clickhouse

    flink cdc 終于支持 sqlserver 了。 現(xiàn)在互聯(lián)網(wǎng)公司用sqlserver的不多,大部分都是一些國企的老舊系統(tǒng)。我們以前同步數(shù)據(jù),都是用datax,但是不能實時同步數(shù)據(jù)。現(xiàn)在有了flinkcdc,可以實現(xiàn)實時同步了。 1、首先sqlserver版本:要求sqlserver版本為14及以上,也就是 SQL Server 2017 版。

    2023年04月08日
    瀏覽(32)
  • XL-LightHouse 與 Flink 和 ClickHouse 流式大數(shù)據(jù)統(tǒng)計系統(tǒng)

    XL-LightHouse 與 Flink 和 ClickHouse 流式大數(shù)據(jù)統(tǒng)計系統(tǒng)

    一個Flink任務(wù)只能并行處理一個或少數(shù)幾個數(shù)據(jù)流,而XL-LightHouse一個任務(wù)可以并行處理數(shù)萬個、幾十萬個數(shù)據(jù)流; 一個Flink任務(wù)只能實現(xiàn)一個或少數(shù)幾個數(shù)據(jù)指標(biāo),而XL-LightHouse單個任務(wù)就能支撐大批量、數(shù)以萬計的數(shù)據(jù)指標(biāo)。 1、XL-LightHouse : ?1、再也不需要用 Flink、Spark、

    2024年02月09日
    瀏覽(61)
  • 十分鐘掌握 Flink CDC,實現(xiàn)Mysql數(shù)據(jù)增量備份到Clickhouse [純干貨,建議收藏]

    十分鐘掌握 Flink CDC,實現(xiàn)Mysql數(shù)據(jù)增量備份到Clickhouse [純干貨,建議收藏]

    Clickhouse的優(yōu)點. 真正的面向列的 DBMS ClickHouse 是一個 DBMS,而不是一個單一的數(shù)據(jù)庫。它允許在運行時創(chuàng)建表和數(shù)據(jù)庫、加載數(shù)據(jù)和運行 查詢,而無需重新配置和重新啟動服務(wù)器。 數(shù)據(jù)壓縮 一些面向列的 DBMS(InfiniDB CE 和 MonetDB)不使用數(shù)據(jù)壓縮。但是,數(shù)據(jù)壓縮確實提高了

    2024年04月14日
    瀏覽(95)
  • 大數(shù)據(jù)Flink(七十):SQL 動態(tài)表 & 連續(xù)查詢

    大數(shù)據(jù)Flink(七十):SQL 動態(tài)表 & 連續(xù)查詢

    文章目錄 SQL 動態(tài)表 連續(xù)查詢 一、???????SQL 應(yīng)用于流處理的思路

    2024年02月10日
    瀏覽(19)
  • flink-cdc,clickhouse寫入,多路輸出

    kafka日志數(shù)據(jù)從kafka讀取 1、關(guān)聯(lián)字典表:完善日志數(shù)據(jù) 2、判斷日志內(nèi)容級別:多路輸出 低級:入clickhouse 高級:入clickhouse的同時推送到kafka供2次數(shù)據(jù)流程處理。

    2024年02月09日
    瀏覽(24)
  • 【flink番外篇】3、flink的source(內(nèi)置、mysql、kafka、redis、clickhouse)介紹及示例(3)- kafka

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(26)
  • 12、Flink source和sink 的 clickhouse 詳細示例

    12、Flink source和sink 的 clickhouse 詳細示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月13日
    瀏覽(18)
  • Spring SpEL在Flink中的應(yīng)用-與Filter結(jié)合實現(xiàn)數(shù)據(jù)動態(tài)分流

    SpEL表達式與Flink fiter結(jié)合可以實現(xiàn)基于表達式的靈活動態(tài)過濾。有關(guān)SpEL表達式的使用請參考 Spring SpEL在Flink中的應(yīng)用-SpEL詳解 。 可以將過濾規(guī)則放入數(shù)據(jù)庫,根據(jù)不同的數(shù)據(jù)設(shè)置不同的過濾表達式,從而實現(xiàn)只需修改過濾表達式不用修改Flink代碼的功能。對于基于Flink進行數(shù)

    2024年01月25日
    瀏覽(12)
  • 【flink番外篇】4、flink的sink(內(nèi)置、mysql、kafka、redis、clickhouse、分布式緩存、廣播變量)介紹及示例(8) - 完整版

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(31)
  • 【flink番外篇】4、flink的sink(內(nèi)置、mysql、kafka、redis、clickhouse、分布式緩存、廣播變量)介紹及示例(5) - kafka

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(34)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包