国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink異步io關(guān)聯(lián)Hbase

這篇具有很好參考價值的文章主要介紹了Flink異步io關(guān)聯(lián)Hbase。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

主程序

    public static void main(String[] args) throws Exception {
        //1.獲取流執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        //設(shè)置動態(tài)參數(shù)
        ParameterTool propertiesargs = ParameterTool.fromArgs(args);
        String fileName = propertiesargs.get("CephConfPath");
        //從hdfs獲取動態(tài)參數(shù)配置文件
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        FileSystem fs = FileSystem.get(URI.create(fileName), conf);
        fs.open(new org.apache.hadoop.fs.Path(fileName));
        ParameterTool propertiesFile = ParameterTool.fromPropertiesFile(fs.open(new org.apache.hadoop.fs.Path(fileName)).getWrappedStream());
        // 注冊給環(huán)境變量(HBASE使用)
        env.getConfig().setGlobalJobParameters(propertiesFile);
        new CephConfig(propertiesFile);

        //2.設(shè)置CK&狀態(tài)后端
        env.setStateBackend(new FsStateBackend(FSSTATEBACKEND));
        env.enableCheckpointing(10000);// 每 ** ms 開始一次 checkpoint
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 設(shè)置模式為精確一次
        env.getCheckpointConfig().setCheckpointTimeout(100000);// Checkpoint 必須在** ms內(nèi)完成,否則就會被拋棄
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);// 同一時間只允許一個 checkpoint 進(jìn)行
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 確認(rèn) checkpoints 之間的時間會進(jìn)行 ** ms
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重啟策略:重啟3次,間隔10s

        //3.從kafka中讀取日志信息,將將每行數(shù)據(jù)轉(zhuǎn)換為JavaBean對象 主流
        DataStreamSource<String> dataStream = env.addSource(KafkaUtils.getKafkaSource(KAFKA_SOURCE_TOPIC, KAFKA_SOURCE_GROUP));
        …………
        //8.讀取HBase中user表,進(jìn)行維度關(guān)聯(lián)
        SingleOutputStreamOperator<CephAccessRecord> record = AsyncDataStream.unorderedWait(
                validDS,
                new DimAsyncFunction<CephAccessRecord>() {
                    @Override
                    public String getKey(CephAccessRecord record) {
                        return record.access_key;
                    }
                },
                60, TimeUnit.SECONDS);
        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));
        StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(
                new Path(HDFS_FILE_PATH),
                new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.DAYS.toMillis(1))//至少包含 20 分鐘的數(shù)據(jù)
                                .withInactivityInterval(TimeUnit.DAYS.toMillis(1 ))//最近 20 分鐘沒有收到新的數(shù)據(jù)
                                .withMaxPartSize(1024 * 1024 * 1024)//文件大小已達(dá)到 1 GB
                                .build())
                .withBucketAssigner(assigner)
                .build();

        // 將record-->過濾上傳數(shù)據(jù)-->轉(zhuǎn)換成jsonstring-->寫入到hdfs
//        allDataDS.filter(log->log.event_type.equals("upload")).map(line->JSON.toJSONString(line)).addSink(fileSink);
        dataStream.map(line->JSON.toJSONString(line)).addSink(fileSink);

        //10.流環(huán)境執(zhí)行
        env.execute();

異步關(guān)聯(lián)程序

package com.data.ceph.function;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;

import java.util.Collections;
import java.util.Map;

public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimAsyncJoinFunction<T> {

    private org.apache.hadoop.hbase.client.Connection connection = null;
    private ResultScanner rs = null;
    private Table table = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //不啟用安全認(rèn)證
        System.setProperty("zookeeper.sasl.client", "false");
        Map<String, String> stringStringMap = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
        String hbase = stringStringMap.get("hbase_zookeeper_quorum");
        org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
        hconf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.23.37,172.16.23.38,172.16.23.39");
//        hconf.set(HConstants.ZOOKEEPER_QUORUM, hbase);
        hconf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
        hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase");

        //指定用戶名為hbase的用戶去訪問hbase服務(wù)
        UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hive");
        connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));
        table = connection.getTable(TableName.valueOf("cloud:user_info"));
    }


    @Override
    public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
        Get get = new Get(Bytes.toBytes(getKey(input)));
        Result rs = table.get(get);
        for (Cell cell : rs.rawCells()) {
            String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
            String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
            BeanUtils.setProperty(input, column, value);
        }
        resultFuture.complete(Collections.singletonList(input));
    }
    @Override
    public void close() throws Exception {
        if (rs != null) rs.close();
        if (table != null) table.close();
        if (connection != null) connection.close();
    }
    @Override
    public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
        System.out.println("TimeOut:" + input);
    }
}

文章來源地址http://www.zghlxwxcb.cn/news/detail-840649.html

到了這里,關(guān)于Flink異步io關(guān)聯(lián)Hbase的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建 下載 https://archive.apache.org/dist/ ?Mysql下載地址 Index of /MySQL/Downloads/ 我最終選擇 Zookeeper3.7.1 +Hadoop3.3.5 + Spark-3.2.4 + Flink-1.16.1 + Kafka2.12-3.4.0 + HBase2.4.17 + Hive3.1.3 ?+JDK1.8.0_391 ?IP規(guī)劃 IP hostname 192.168.1.5 node1 192.168.1.6 node

    2024年01月23日
    瀏覽(31)
  • Flink異步IO

    Flink異步IO

    本文講解 Flink 用于訪問外部數(shù)據(jù)存儲的異步 I/O API。對于不熟悉異步或者事件驅(qū)動編程的用戶,建議先儲備一些關(guān)于 Future 和事件驅(qū)動編程的知識。 本文代碼gitee地址: https://gitee.com/ddxygq/BigDataTechnical/blob/main/Flink/src/main/java/operator/AsyncIODemo.java 在與外部系統(tǒng)交互(用數(shù)據(jù)庫中

    2024年02月02日
    瀏覽(18)
  • Flink異步IO初步了解

    Flink異步IO初步了解

    ? ? ? ? 之前使用Flink查詢Redis數(shù)據(jù)的過程中,由于對數(shù)據(jù)一致性的要求并不是很高,當(dāng)時是用MapFunction +? State 的方案。先緩存一大堆數(shù)據(jù)到State中,達(dá)到一定數(shù)量之后,將批量Key提交到Redis中進(jìn)行查詢。 ????????由于Redis性能極高,所以并沒有出現(xiàn)什么問題,后來了解到了

    2024年02月03日
    瀏覽(18)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的實(shí)時綜合案例(五)FineBI可視化

    基于Flume+Kafka+Hbase+Flink+FineBI的實(shí)時綜合案例(五)FineBI可視化

    目標(biāo) : 實(shí)現(xiàn)FineBI訪問MySQL結(jié)果數(shù)據(jù)集的配置 實(shí)施 安裝FineBI 參考《FineBI Windows版本安裝手冊.docx》安裝FineBI 配置連接 數(shù)據(jù)準(zhǔn)備 小結(jié) 實(shí)現(xiàn)FineBI訪問MySQL結(jié)果數(shù)據(jù)集的配置 目標(biāo) : 實(shí)現(xiàn)FineBI實(shí)時報表構(gòu)建 路徑 step1:實(shí)時報表構(gòu)建 step2:實(shí)時報表配置 step3:實(shí)時刷新測試 實(shí)施 實(shí)

    2024年02月04日
    瀏覽(41)
  • Flink連接Hbase時的kafka報錯:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils

    Flink連接Hbase時的kafka報錯:java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/ThreadUtils

    書接上文 【Flink實(shí)時數(shù)倉】需求一:用戶屬性維表處理-Flink CDC 連接 MySQL 至 Hbase 實(shí)驗(yàn)及報錯分析http://t.csdn.cn/bk96r 我隔了一天跑Hbase中的數(shù)據(jù),發(fā)現(xiàn)kafka報錯,但是kafka在這個代碼段中并沒有使用,原因就是我在今天的其他項(xiàng)目中添加的kafka依賴導(dǎo)致了沖突。 注釋掉kafka依賴,

    2024年02月04日
    瀏覽(36)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆級超詳細(xì)含圖文)

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆級超詳細(xì)含圖文)

    說明: 本篇將詳細(xì)介紹用二進(jìn)制安裝包部署hadoop等組件,注意事項(xiàng),各組件的使用,常用的一些命令,以及在部署中遇到的問題解決思路等等,都將詳細(xì)介紹。 ip hostname 192.168.1.11 node1 192.168.1.12 node2 192.168.1.13 node3 1.2.1系統(tǒng)版本 1.2.2內(nèi)存建議最少4g、2cpu、50G以上的磁盤容量 本次

    2024年02月12日
    瀏覽(37)
  • Linux多虛擬機(jī)集群化配置詳解(Zookeeper集群、Kafka集群、Hadoop集群、HBase集群、Spark集群、Flink集群、Zabbix、Grafana部署)

    Linux多虛擬機(jī)集群化配置詳解(Zookeeper集群、Kafka集群、Hadoop集群、HBase集群、Spark集群、Flink集群、Zabbix、Grafana部署)

    前面安裝的軟件,都是以單機(jī)模式運(yùn)行的,學(xué)習(xí)大數(shù)據(jù)相關(guān)的軟件部署,后續(xù)安裝軟件服務(wù),大多數(shù)都是以集群化(多臺服務(wù)器共同工作)模式運(yùn)行的。所以,需要完成集群化環(huán)境的前置準(zhǔn)備,包括創(chuàng)建多臺虛擬機(jī),配置主機(jī)名映射,SSH免密登錄等等。 我們可以使用VMware提供

    2024年02月04日
    瀏覽(30)
  • 【flink番外篇】14、Flink異步I/O訪問外部數(shù)據(jù)示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月16日
    瀏覽(21)
  • 55、Flink之用于外部數(shù)據(jù)訪問的異步 I/O介紹及示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(17)
  • 輕松通關(guān)Flink第19講:Flink 如何做維表關(guān)聯(lián)

    在實(shí)際生產(chǎn)中,我們經(jīng)常會有這樣的需求,需要以原始數(shù)據(jù)流作為基礎(chǔ),然后關(guān)聯(lián)大量的外部表來補(bǔ)充一些屬性。例如,我們在訂單數(shù)據(jù)中,希望能得到訂單收貨人所在省的名稱,一般來說訂單中會記錄一個省的 ID,那么需要根據(jù) ID 去查詢外部的維度表補(bǔ)充省名稱屬性。 在

    2024年02月13日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包