国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink之FileSink將數(shù)據(jù)寫入parquet文件

這篇具有很好參考價值的文章主要介紹了Flink之FileSink將數(shù)據(jù)寫入parquet文件。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink之FileSink將數(shù)據(jù)寫入parquet文件

在使用FileSink將數(shù)據(jù)寫入列式存儲文件中時必須使用forBulkFormat,列式存儲文件如ORCFile、ParquetFile,這里就以ParquetFile為例結(jié)合代碼進(jìn)行說明.

在Flink1.15.3中是通過構(gòu)造ParquetWriterFactory然后調(diào)用forBulkFormat方法將構(gòu)造好的ParquetWriterFactory傳入,這里先講一下構(gòu)造ParquetWriterFactory一共有三種方式

序列 API
方式一 AvroParquetWriters.forGenericRecord
方式二 AvroParquetWriters.forSpecificRecord
方式三 AvroParquetWriters.forReflectRecord

其中方式三AvroParquetWriters.forReflectRecord是我們常用的方法,使用起來也是復(fù)雜最低、代碼變更時靈活度較好方法,方式二AvroParquetWriters.forSpecificRecord使用起來復(fù)雜度較高,但是代碼變更的時候靈活度相對較好的方法,方式一AvroParquetWriters.forGenericRecord使用起來比較麻煩,而且代碼變更時需要更改的也比較多,這里主要介紹方式二和方式三的使用方式.

要說明一點再Flink1.15.3中是通過AvroParquetWriters來構(gòu)造ParquetWriterFactory,如果是早期版本的Flink可能是要通過ParquetAvroWriters來進(jìn)行構(gòu)造,當(dāng)然在1.15.3中也可以通過這個方式進(jìn)行構(gòu)造,不過ParquetAvroWriters已經(jīng)標(biāo)注為過時并且建議使用AvroParquetWriters

源碼內(nèi)容如下:

/**
 * Convenience builder to create {@link ParquetWriterFactory} instances for the different Avro
 * types.
 *
 * @deprecated use {@link AvroParquetWriters} instead. // 看這部分是建議使用AvroParquetWriters
 */
@Deprecated // 這里已經(jīng)標(biāo)注了過時
public class ParquetAvroWriters {

    /**
     * Creates a ParquetWriterFactory for an Avro specific type. The Parquet writers will use the
     * schema of that specific type to build and write the columnar data.
     *
     * @param type The class of the type to write.
     */
    public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(
            Class<T> type) {
        return AvroParquetWriters.forSpecificRecord(type);
    }
  • AvroParquetWriters.forReflectRecord(方式三)

    這里就先介紹一下AvroParquetWriters.forReflectRecord的使用方式,我們在使用FileSink時最好配合Checkpoint使用,不然文件只會出現(xiàn)inprogress狀態(tài),感興趣的可以自己實驗一下,我在Flink中FileSink的使用演示了加Checkpoint和不加Checkpoint的區(qū)別感興趣的可以看一下.

    代碼模板內(nèi)容比較簡單,直接代碼演示:

    import com.jin.bean.User;
    import com.jin.schema.UserSchemaBean;
    import org.apache.flink.connector.file.sink.FileSink;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.parquet.ParquetWriterFactory;
    import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/6/28
     * @Description: 測試
     **/
    public class FlinkFileSinkForParquet {
        public static void main(String[] args) throws Exception {
            // 創(chuàng)建流環(huán)境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 設(shè)置并行度
            env.setParallelism(1);
    
            // 每30秒作為checkpoint的一個周期
            env.enableCheckpointing(30000);
            // 兩次checkpoint間隔最少是20秒
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
            // 程序取消或者停止時不刪除checkpoint
            env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // checkpoint必須在60秒結(jié)束,否則將丟棄
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            // 同一時間只能有一個checkpoint
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            // 設(shè)置EXACTLY_ONCE語義,默認(rèn)就是這個
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            // checkpoint存儲位置
            env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
            
            // 添加數(shù)據(jù)源(這里使用的是自定義數(shù)據(jù)源CustomizeSource,方便測試)
            DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource());
            // 將數(shù)據(jù)流中的數(shù)據(jù)存儲到bean對象中
            SingleOutputStreamOperator<User> userMapStream = sourceStream.map(bean -> new User(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit()));
            // 構(gòu)建parquetWriterFactory
            ParquetWriterFactory<User> parquetWriterFactory2 = AvroParquetWriters.forReflectRecord(User.class);
            // 構(gòu)建FileSink
            FileSink<User> parquetFileSink = FileSink
                    // 使用Bulk模式,并配置路徑和對應(yīng)的schema
                    .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory2)
                    // 分桶策略,使用默認(rèn)的
                    .withBucketAssigner(new DateTimeBucketAssigner<User>())
                    // 每100毫秒檢查一次分桶
                    .withBucketCheckInterval(100)
                    // 滾動策略,Bulk的滾動策略只有一種,就是發(fā)生Checkpoint的時候才進(jìn)行滾動(為了保證列式文件的完整性)
                    .withRollingPolicy(OnCheckpointRollingPolicy.build())
                    .build();
            // 輸出到文件
            userMapStream.sinkTo(parquetFileSink);
            env.execute();
    
        }
    }
    
    @Getter
    @Setter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    class User {
        private String name;
        private int age;
        private String gender;
        private String hobbit;
    }
    

    代碼中注釋很詳細(xì)了,具體使用看注釋即可。這里說明一下為什么forBulkFormat的滾動策略只有OnCheckpointRollingPolicy而不是像forRowFormat那樣可以通過時間和文件大小來控制文件滾動,注釋中我也講了是為了保證列式存儲文件的完整性,因為列式文件中記錄了很多信息,并不想行式存儲文件一行一行的寫就行,寫到某一行直接停了也不影響文件的使用,而列式存儲文件中不單單是記錄了數(shù)據(jù)本身還有對應(yīng)的字段類型、文件頭信息、文件尾信息、切片索引等很多信息,如果在寫入數(shù)據(jù)時某一刻直接停止了,而文件還沒有生成完整的信息那就會導(dǎo)致這個列士存儲文件根本不具備使用性,是無法進(jìn)行解析的。

    就比如說ParquetFile,它的文件結(jié)構(gòu)如下圖
    flink寫parquet文件,FLink,flink,大數(shù)據(jù),java

    可以看到文件的結(jié)構(gòu)信息是很復(fù)雜的,如果感興了解一下可以看數(shù)據(jù)存儲格式這篇文章了解一下,這里就不細(xì)說了,內(nèi)容還是比較多的.

  • AvroParquetWriters.forSpecificRecord(方式二)

    forSpecificRecord的使用不像forReflectRecord那樣自定義一個bean接收數(shù)據(jù)就行了,使用forSpecificRecord還要結(jié)合一下Apache avro的官網(wǎng)看一下,下面我就介紹一下如何使用forSpecificRecord.

    avro的使用有兩種方式一是通過API直接調(diào)用的方式,二通過配置avsc文件然后進(jìn)行編譯的方式,在代碼中我們使用的第二種方式,使用第一種方式同樣會出現(xiàn)很多schema的信息在代碼中寫死修改起來會比較復(fù)雜的問題,而且對avroAPI也要足夠熟悉,學(xué)習(xí)成本還是有的.

    1. resource目錄中創(chuàng)建avsc文件,文件內(nèi)容如下

      {
        "namespace": "com.jin.schema",
        "type": "record",
        "name": "UserSchemaBean",
        "fields": [
          {"name": "name", "type": "string"},
          {"name": "age", "type": "int"},
          {"name": "gender",  "type": "string"},
          {"name": "hobbit", "type": "string"}
        ]
      }
      

      文件中的內(nèi)容就是schema信息,這里我相信大家都能看得明白."namespace": "com.jin.schema"編譯后自動創(chuàng)建的bean的存儲位置,"name": "UserSchemaBean"就是配置生成bean的名稱,fields中就是配置生成bean的成員變量和對應(yīng)的數(shù)據(jù)類型.

      官網(wǎng)演示的avsc文件內(nèi)容如下:

      {"namespace": "example.avro",
       "type": "record",
       "name": "User",
       "fields": [
           {"name": "name", "type": "string"},
           {"name": "favorite_number",  "type": ["int", "null"]},
           {"name": "favorite_color", "type": ["string", "null"]}
       ]
      }
      

      編譯后就會根據(jù)avsc文件中的schema信息在配置好的目錄中自動創(chuàng)建bean.

    2. Maven中添加avsc文件編譯插件

      官網(wǎng)內(nèi)容如下:

      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.11.1</version>
        <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
            </goals>
            <configuration>
              <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
      

      要注意<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>是已經(jīng)配置完的avsc文件的位置,像是我就是在原有的resource目錄下配置的就要將內(nèi)容改成<sourceDirectory>${project.basedir}/src/main/resource/</sourceDirectory>否則在編譯時就會報錯找不到對應(yīng)的目錄或文件,如果想直接使用<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>那就在項目的main目錄下創(chuàng)建一個avro目錄并將目錄性質(zhì)改為Source root(這個如果不會可自行百度,關(guān)鍵字我都已經(jīng)提供了).

      我的項目中實際配置如下:

                  <!-- avro插件 -->
                  <plugin>
                      <groupId>org.apache.avro</groupId>
                      <artifactId>avro-maven-plugin</artifactId>
                      <version>1.10.0</version>
                      <executions>
                          <execution>
                              <phase>generate-sources</phase>
                              <goals>
                                  <goal>schema</goal>
                              </goals>
                              <configuration>
                                  <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                                  <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
      

      選擇插件的版本時要注意依賴沖突問題,我們要先看一下Flink的flink-avro下的org.apache.avro:avro是什么版本,如下圖:
      flink寫parquet文件,FLink,flink,大數(shù)據(jù),java

      可以看到1.15.3org.apache.avro:avro的版本是1.10.0,所以我選擇的插件也是這個版本.

    3. 編譯

      上面步驟都完成了就可以進(jìn)行編譯了,Maven->Lifecycle->compile,這里看一下編譯后的結(jié)果如下圖:
      flink寫parquet文件,FLink,flink,大數(shù)據(jù),java

      可以看到已經(jīng)根據(jù)我們配置的avsc文件自動創(chuàng)建了對應(yīng)的bean,這里看一下成員變量內(nèi)容是否一致,如下:

        /**
         * All-args constructor.
         * @param name The new value for name
         * @param age The new value for age
         * @param gender The new value for gender
         * @param hobbit The new value for hobbit
         */
        public UserSchemaBean(java.lang.CharSequence name, java.lang.Integer age, java.lang.CharSequence gender, java.lang.CharSequence hobbit) {
          this.name = name;
          this.age = age;
          this.gender = gender;
          this.hobbit = hobbit;
        }
      

      可以看到成員變量信息也是完全一致,我這里值展示了小部分代碼,編譯后的bean中的代碼信息很多,不過我們不用關(guān)心這個,懂與不懂都不影響使用.

    4. 代碼內(nèi)容

      接下來就到主題了,實際的代碼內(nèi)容如下:

      import com.jin.schema.UserSchemaBean;
      import org.apache.flink.connector.file.sink.FileSink;
      import org.apache.flink.core.fs.Path;
      import org.apache.flink.formats.parquet.ParquetWriterFactory;
      import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
      import org.apache.flink.streaming.api.CheckpointingMode;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.environment.CheckpointConfig;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
      import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
      
      /**
       * @Author: J
       * @Version: 1.0
       * @CreateTime: 2023/6/28
       * @Description: 測試
       **/
      public class FlinkFileSinkForParquet {
          public static void main(String[] args) throws Exception {
              // 創(chuàng)建流環(huán)境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // 設(shè)置并行度
              env.setParallelism(1);
      
              // 每30秒作為checkpoint的一個周期
              env.enableCheckpointing(30000);
              // 兩次checkpoint間隔最少是20秒
              env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
              // 程序取消或者停止時不刪除checkpoint
              env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
              // checkpoint必須在60秒結(jié)束,否則將丟棄
              env.getCheckpointConfig().setCheckpointTimeout(60000);
              // 同一時間只能有一個checkpoint
              env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
              // 設(shè)置EXACTLY_ONCE語義,默認(rèn)就是這個
              env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
              // checkpoint存儲位置
              env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
      
              // 添加數(shù)據(jù)源(這里使用的是自定義數(shù)據(jù)源,方便測試)
              DataStreamSource<CustomizeBean> sourceStream = env.addSource(new CustomizeSource());
              // 將數(shù)據(jù)流中的對象轉(zhuǎn)成UserSchemaBean類型
              SingleOutputStreamOperator<UserSchemaBean> mapStream = sourceStream.map(bean -> new UserSchemaBean(bean.getName(), bean.getAge(), bean.getGender(), bean.getHobbit()));
              // 構(gòu)建parquetWriterFactory,這里傳入的就是編譯后的UserSchemaBean
              ParquetWriterFactory<UserSchemaBean> parquetWriterFactory = AvroParquetWriters.forSpecificRecord(UserSchemaBean.class);
              // 構(gòu)建FileSink
              FileSink<UserSchemaBean> parquetFileSink = FileSink
                      // 使用Bulk模式,并配置路徑和對應(yīng)的schema
                      .forBulkFormat(new Path("/Users/xxx/data/testData/"), parquetWriterFactory)
                      // 分桶策略,使用默認(rèn)的
                      .withBucketAssigner(new DateTimeBucketAssigner<UserSchemaBean>())
                      // 每100毫秒檢查一次分桶
                      .withBucketCheckInterval(100)
                      // 滾動策略,Bulk的滾動策略只有一種,就是發(fā)生Checkpoint的時候才進(jìn)行滾動(為了保證列式文件的完整性)
                      .withRollingPolicy(OnCheckpointRollingPolicy.build())
                      .build();
              // 輸出到文件
              mapStream.sinkTo(parquetFileSink);
              env.execute();
      
          }
      }
      

      通過代碼我們可以看到,內(nèi)容基本就是一致的無非就是forSpecificRecord傳入的bean不同而已,當(dāng)然還是建議使用AvroParquetWriters.forReflectRecord這種方式,簡易高效,復(fù)雜的過程并不一定能提高我們的代碼能力.

      到這里這兩種方式我都介紹完了,希望看完這篇文章有所收獲.文章來源地址http://www.zghlxwxcb.cn/news/detail-770193.html

到了這里,關(guān)于Flink之FileSink將數(shù)據(jù)寫入parquet文件的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink將數(shù)據(jù)寫入MySQL(JDBC)

    Flink將數(shù)據(jù)寫入MySQL(JDBC)

    在實際的生產(chǎn)環(huán)境中,我們經(jīng)常會把Flink處理的數(shù)據(jù)寫入MySQL、Doris等數(shù)據(jù)庫中,下面以MySQL為例,使用JDBC的方式將Flink的數(shù)據(jù)實時數(shù)據(jù)寫入MySQL。 2.1 版本說明 2.2 導(dǎo)入相關(guān)依賴 2.3 連接數(shù)據(jù)庫,創(chuàng)建表 2.4 創(chuàng)建POJO類 2.5 自定義map函數(shù) 2.5 Flink2MySQL 2.6 啟動necat、Flink,觀察數(shù)據(jù)庫寫

    2024年02月07日
    瀏覽(15)
  • 6.2、Flink數(shù)據(jù)寫入到Kafka

    6.2、Flink數(shù)據(jù)寫入到Kafka

    目錄 1、添加POM依賴 2、API使用說明 3、序列化器 3.1 使用預(yù)定義的序列化器 3.2 使用自定義的序列化器 4、容錯保證級別 4.1?至少一次 的配置 4.2?精確一次 的配置 5、這是一個完整的入門案例 Apache Flink 集成了通用的 Kafka 連接器,使用時需要根據(jù)生產(chǎn)環(huán)境的版本引入相應(yīng)的依賴

    2024年02月09日
    瀏覽(15)
  • 【Flink-Kafka-To-Hive】使用 Flink 實現(xiàn) Kafka 數(shù)據(jù)寫入 Hive

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 Hive。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進(jìn)行動態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Flink 集成 Kafka 寫入 Hive 需要進(jìn)行 checkpoint 才能落盤至 HDFS。 5、先在 Hive 中創(chuàng)建表然后動態(tài)獲取 Hive 的表

    2024年02月03日
    瀏覽(23)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 實現(xiàn) Kafka 數(shù)據(jù)寫入 ClickHouse

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 ClickHouse。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進(jìn)行動態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、先在 ClickHouse 中創(chuàng)建表然后動態(tài)獲取 ClickHouse 的表結(jié)構(gòu)。 5、Kafka 數(shù)據(jù)為 Json 格式,通過 FlatMap 扁平

    2024年02月03日
    瀏覽(23)
  • 【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    Flink 安裝的教程就不在這里贅敘了,可以看一下以前的文章,這篇文章主要是把流式數(shù)據(jù)寫入的OLAP(ClickHouse)中作查詢分析 Flink 1.13.2, ClickHouse?22.1.3.7 這里直接使用docker安裝,沒有安裝的同學(xué)可以使用homebreak來安裝,執(zhí)行下面的命令即可( 已經(jīng)安裝了docker的可以忽略 ) 四指

    2024年02月03日
    瀏覽(26)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費 Kafka 數(shù)據(jù)寫入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關(guān)配置都是通過 Mysql 讀取的(生產(chǎn)環(huán)境中沒有直接寫死的,都是通過配置文件動態(tài)配置),大家實際測試過程中可以將相關(guān)配置信息寫死。 2、此程序中 Kafka 涉及到了 Kerberos 認(rèn)證操作

    2024年02月03日
    瀏覽(21)
  • 15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse

    15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse

    編寫Flink完成數(shù)據(jù)寫入到ClickHouse操作, 后續(xù)基于CK完成指標(biāo)統(tǒng)計操作 3.8.1.ClickHouse基本介紹 ClickHouse 是俄羅斯的Yandex于2016年開源的列式存儲數(shù)據(jù)庫(DBMS),使用C++語言編寫,主要用于在線分析處理查詢(OLAP),能夠使用SQL查詢實時生成分析數(shù)據(jù)報告。 結(jié)論: ClickHouse像很多OL

    2024年02月14日
    瀏覽(61)
  • 14_基于Flink將pulsar數(shù)據(jù)寫入到HBase

    14_基于Flink將pulsar數(shù)據(jù)寫入到HBase

    3.7.1.編寫Flink完成數(shù)據(jù)寫入到Hbase操作, 完成數(shù)據(jù)備份, 便于后續(xù)進(jìn)行即席查詢和離線分析 3.7.1.1.HBase基本介紹 hbase是基于Google發(fā)布bigTable論文產(chǎn)生一款軟件, 是一款noSQL型數(shù)據(jù), 不支持SQL. 不支持join的操作, 沒有表關(guān)系, 不支持事務(wù)(多行事務(wù)),hbase是基于 HDFS的采用java 語言編寫 查

    2024年02月13日
    瀏覽(66)
  • 【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】

    【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫入方式&寫入模式&Bucket索引】

    當(dāng)前表默認(rèn)是快照讀取,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過參數(shù) read.streaming.enabled 參數(shù)開啟流讀模式,通過 read.start-commit 參數(shù)指定起始消費位置,支持指定 earliest 從最早消費。 1.with參數(shù) 名稱 Required 默認(rèn)值 說明 read.streaming.enabled false false 設(shè)置 true 開啟流讀模式

    2024年02月14日
    瀏覽(21)
  • 【Flink-Kafka-To-Mongo】使用 Flink 實現(xiàn) Kafka 數(shù)據(jù)寫入 Mongo(根據(jù)對應(yīng)操作類型進(jìn)行增、刪、改操作,寫入時對時間類型字段進(jìn)行單獨處理)

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 Mongo。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進(jìn)行動態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Kafka 數(shù)據(jù)為 Json 格式,獲取到的數(shù)據(jù)根據(jù)操作類型字段進(jìn)行增刪改操作。 5、讀取時使用自定義 Source,寫

    2024年02月22日
    瀏覽(31)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包