国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink streamload寫入doris

這篇具有很好參考價值的文章主要介紹了flink streamload寫入doris。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

flink 1.13 streamload寫入doris

前言

官方教程詳細介紹了基于flink 1.16的各種寫入方式,本文主要介紹的是基于flink 1.13的RowData 數(shù)據(jù)流(RowDataSerializer)寫入文章來源地址http://www.zghlxwxcb.cn/news/detail-758109.html

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.1</flink.version>
    </properties>


        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.13_2.12</artifactId>
            <version>1.0.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

一、目標表

-- doris sink table 
CREATE TABLE IF NOT EXISTS events (
    `user_id` INT NOT NULL COMMENT '用戶id',
    `date` DATE COMMENT '接收日期',
    `event_type` varchar(256)  COMMENT '事件類型',
    `province` varchar(128)  COMMENT '省份',
    `city` varchar(128)  COMMENT '城市'
    `receive_time` DATETIME
)
DUPLICATE KEY(`user_id`, `receive_date`)
COMMENT '事件表'
PARTITION BY RANGE (`receive_date`)
(
FROM
    ("2023-01-01") TO ("2023-09-14") INTERVAL 1 DAY
)
DISTRIBUTED BY HASH (`user_id`) BUCKETS 1
PROPERTIES (
   "replication_allocation" = "tag.location.default: 1",
   "compression"="LZ4",
   "dynamic_partition.enable" = "true",
   "dynamic_partition.time_unit" = "DAY",
   "dynamic_partition.end" = "3",
   "dynamic_partition.prefix" = "p",
   "dynamic_partition.buckets" = "1"
);

二、map

public class EventMapFunction extends RichMapFunction<Event, RowData> {
    private static final Logger log = LoggerFactory.getLogger(EventMapFunction.class);

    @Override
    public RowData map(Event event) {
        GenericRowData genericRowData = new GenericRowData(6);
        try {
        //map 字段映射
            genericRowData.setField(0, event.getUserId());
            // 字符型需要轉化,否則會報錯
            genericRowData.setField(1, StringData.fromString(event.getReceiveDate()));
            genericRowData.setField(2, StringData.fromString(event.getEventType()));
            genericRowData.setField(3, StringData.fromString(event.getProvince()));
            genericRowData.setField(4, StringData.fromString(event.getCity()));
            genericRowData.setField(5, StringData.fromString(event.getReceiveTime()));

        } catch (Exception e) {
            log.error("Event data map error : " + e);
        }
        return genericRowData;
    }
}

三、DorisSink

public class DorisSinkUtil {
    public static SinkFunction<RowData> getDorisSink(String table, String labelPrefix) {
 		//寫入格式   
        Properties properties = new Properties();
        properties.setProperty("read_json_by_line", "true");
        properties.setProperty("format", "json");
        properties.setProperty("strip_outer_array", "true");

        SinkFunction<RowData> dorisSink = DorisSink.sink(getEventFields(),
                getEventDataType(),
                DorisReadOptions.builder().build(),
                DorisExecutionOptions.builder()
                        .setBatchSize(3)
                        .setBatchIntervalMs(0L)
                        .setMaxRetries(3)
                        .setStreamLoadProp(properties)
                        .build(),
                DorisOptions.builder()
                        .setFenodes(readValue("doris.fenodes"))
                        .setTableIdentifier(table)
                        .setUsername(readValue("doris.username"))
                        .setPassword(readValue("doris.password"))
                        .build());
          
        return dorisSink;
    }
	//字段及類型一一對應
    public static String[] getEventFields() {
        return new String[]{
                "user_id",
                "receive_date",
                "event_type",
                "province",
                "city",
                "receive_time"
        };
    }

    public static LogicalType[] getEventDataType() {
        return new LogicalType[]{
                new IntType(),
                new VarCharType(256),
                new VarCharType(256),
                new VarCharType(128),
                new VarCharType(128),
                new VarCharType(256)
        };
    }

}

四、job主類

    public static void main(String[] args) {

        try {
            /** 一、 創(chuàng)建flink流式執(zhí)行環(huán)境 */
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setAutoWatermarkInterval(0);

            /** 二、 獲取Source */
            KafkaSource<String> eventSource = getKafkaSource(
                    readValue("kafka.broker.id"),
                    readValue("kafka.event.topic"),
                    readValue("kafka.event.group.id"));
            /** 三、消費 Source */
            SingleOutputStreamOperator<String> eventSourceStream = env.fromSource(eventSource, WatermarkStrategy.noWatermarks(), "kafkaSource_event").setParallelism(12);

            /** 四、解析數(shù)據(jù) */
            SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = eventSourceStream.flatMap(new EventParseFlatMap()).setParallelism(12);


			/** 五、將java bean類型轉化為rowdata類型 */
            SingleOutputStreamOperator<RowData> eventStream =   eventSingleOutputStreamOperator   .map(new EventMapFunction()).setParallelism(12);
            

            /** 六、構建doris sink */
            SinkFunction<RowData> eventSink = DorisSinkUtil.getDorisSink(readValue("doris.table.event"), Constants.EVENT_LABEL_PREFIX);


            /** 七、輸出至doris */
            eventStream.addSink(eventSink).setParallelism(12);


            env.execute("EventJob");
        } catch (Exception e) {
            LOG.error("EventJob failed to activate", e);
        }
    }

到了這里,關于flink streamload寫入doris的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Flink實時同步MySQL與Doris數(shù)據(jù)

    Flink實時同步MySQL與Doris數(shù)據(jù)

    技術解析|Doris Connector 結合 Flink CDC 實現(xiàn) MySQL 分庫分表 Exactly Once 精準接入-阿里云開發(fā)者社區(qū) 1. Flink環(huán)境: https://flink.apache.org/zh/ 下載flink-1.15.1 解壓,修改配置 修改配置 修改rest.bind-address為 0.0.0.0 下載依賴jar包 至 flink安裝目錄lib下 啟動flink 訪問WebUI http://192.168.0.158:8081 2、

    2024年02月13日
    瀏覽(27)
  • Flink寫入數(shù)據(jù)到ClickHouse

    Flink寫入數(shù)據(jù)到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依賴 Flink開發(fā)相關依賴 3.Bean實體類 User.java 4.ClickHouse業(yè)務寫入邏輯 ClickHouseSinkFunction.java open():在SinkFunction實例化后調用,用于初始化連接或資源。這在處理每個并行任務的子任務之前只被調用一次。 invoke():定義了在每個元素到達Sink操

    2024年02月12日
    瀏覽(13)
  • Flink將數(shù)據(jù)寫入MySQL(JDBC)

    Flink將數(shù)據(jù)寫入MySQL(JDBC)

    在實際的生產(chǎn)環(huán)境中,我們經(jīng)常會把Flink處理的數(shù)據(jù)寫入MySQL、Doris等數(shù)據(jù)庫中,下面以MySQL為例,使用JDBC的方式將Flink的數(shù)據(jù)實時數(shù)據(jù)寫入MySQL。 2.1 版本說明 2.2 導入相關依賴 2.3 連接數(shù)據(jù)庫,創(chuàng)建表 2.4 創(chuàng)建POJO類 2.5 自定義map函數(shù) 2.5 Flink2MySQL 2.6 啟動necat、Flink,觀察數(shù)據(jù)庫寫

    2024年02月07日
    瀏覽(15)
  • 6.2、Flink數(shù)據(jù)寫入到Kafka

    6.2、Flink數(shù)據(jù)寫入到Kafka

    目錄 1、添加POM依賴 2、API使用說明 3、序列化器 3.1 使用預定義的序列化器 3.2 使用自定義的序列化器 4、容錯保證級別 4.1?至少一次 的配置 4.2?精確一次 的配置 5、這是一個完整的入門案例 Apache Flink 集成了通用的 Kafka 連接器,使用時需要根據(jù)生產(chǎn)環(huán)境的版本引入相應的依賴

    2024年02月09日
    瀏覽(15)
  • 怎么使用 Flink 向 Apache Doris 表中寫 Bitmap 類型的數(shù)據(jù)

    Bitmap是一種經(jīng)典的數(shù)據(jù)結構,用于高效地對大量的二進制數(shù)據(jù)進行壓縮存儲和快速查詢。Doris支持bitmap數(shù)據(jù)類型,在Flink計算場景中,可以結合Flink doris Connector對bitmap數(shù)據(jù)做計算。 社區(qū)里很多小伙伴在是Doris Flink Connector的時候,不知道怎么寫B(tài)itmap類型的數(shù)據(jù),本文將介紹如何

    2024年02月07日
    瀏覽(18)
  • 【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    Flink 安裝的教程就不在這里贅敘了,可以看一下以前的文章,這篇文章主要是把流式數(shù)據(jù)寫入的OLAP(ClickHouse)中作查詢分析 Flink 1.13.2, ClickHouse?22.1.3.7 這里直接使用docker安裝,沒有安裝的同學可以使用homebreak來安裝,執(zhí)行下面的命令即可( 已經(jīng)安裝了docker的可以忽略 ) 四指

    2024年02月03日
    瀏覽(26)
  • 使用 Flink CDC 實現(xiàn) MySQL 數(shù)據(jù),表結構實時入 Apache Doris

    現(xiàn)有數(shù)據(jù)庫:mysql 數(shù)據(jù):庫表較多,每個企業(yè)用戶一個分庫,每個企業(yè)下的表均不同,無法做到聚合,且表可以被用戶隨意改動,增刪改列等,增加表 分析:用戶自定義分析,通過拖拽定義圖卡,要求實時,點擊確認即出現(xiàn)相應結果,其中有無法預判的過濾 問題:隨業(yè)務增長

    2023年04月08日
    瀏覽(24)
  • flink cdc同步Oracle數(shù)據(jù)庫資料到Doris問題集錦

    java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder at com.ververica.cdc.debezium.DebeziumSourceFunction.open(DebeziumSourceFunction.java:218) ~[flink-connector-debezium-2.2.0.jar:2.2.0] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-co

    2024年02月16日
    瀏覽(22)
  • Flink將數(shù)據(jù)寫入CSV文件后文件中沒有數(shù)據(jù)

    Flink將數(shù)據(jù)寫入CSV文件后文件中沒有數(shù)據(jù)

    Flink中有一個過時的 sink 方法: writeAsCsv ,這個方法是將數(shù)據(jù)寫入 CSV 文件中,有時候我們會發(fā)現(xiàn)程序啟動后,打開文件查看沒有任何數(shù)據(jù),日志信息中也沒有任何報錯,這里我們結合源碼分析一下這個原因. 這里先看一下數(shù)據(jù)處理的代碼 代碼中我是使用的自定義數(shù)據(jù)源生產(chǎn)數(shù)據(jù)的方式

    2024年02月16日
    瀏覽(14)
  • Flink之FileSink將數(shù)據(jù)寫入parquet文件

    Flink之FileSink將數(shù)據(jù)寫入parquet文件

    在使用FileSink將數(shù)據(jù)寫入列式存儲文件中時必須使用 forBulkFormat ,列式存儲文件如 ORCFile 、 ParquetFile ,這里就以 ParquetFile 為例結合代碼進行說明. 在Flink 1.15.3 中是通過構造 ParquetWriterFactory 然后調用 forBulkFormat 方法將構造好的 ParquetWriterFactory 傳入,這里先講一下構造 ParquetWriterF

    2024年02月03日
    瀏覽(18)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包