国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

14_基于Flink將pulsar數(shù)據(jù)寫入到HBase

這篇具有很好參考價(jià)值的文章主要介紹了14_基于Flink將pulsar數(shù)據(jù)寫入到HBase。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

3.7.基于Flink將數(shù)據(jù)寫入到HBase

3.7.1.編寫Flink完成數(shù)據(jù)寫入到Hbase操作, 完成數(shù)據(jù)備份, 便于后續(xù)進(jìn)行即席查詢和離線分析

3.7.1.1.HBase基本介紹

hbase是基于Google發(fā)布bigTable論文產(chǎn)生一款軟件, 是一款noSQL型數(shù)據(jù), 不支持SQL. 不支持join的操作, 沒有表關(guān)系, 不支持事務(wù)(多行事務(wù)),hbase是基于 HDFS的采用java 語(yǔ)言編寫

查詢hbase數(shù)據(jù)一般有三種方案(主鍵(row key)查詢, 主鍵的范圍檢索,查詢?nèi)繑?shù)據(jù))

都是以字節(jié)類型存儲(chǔ),存儲(chǔ)結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)。

hbase表的特點(diǎn): 大 面向列的存儲(chǔ)方案 稀疏性

2.7.1.2.應(yīng)用場(chǎng)景

1)需要進(jìn)行隨機(jī)讀寫的操作。
2)數(shù)據(jù)量比較大。
3)數(shù)據(jù)比較稀疏。

2.7.1.3.HBase安裝操作

本次安裝的HBase為2.2.7,詳細(xì)的安裝手冊(cè)大家可以參考資料, 還需要大家注意,HBase的啟動(dòng)需要依賴于zookeeper
和HDFS的, 顧需要先安裝 HADOOP與zookeeper
14_基于Flink將pulsar數(shù)據(jù)寫入到HBase,# Apache Pulsar,pulsar

  • 1-在Hbase中創(chuàng)建目標(biāo)表
create 'itcast_h_ems, {NAME=>'f1',COMPRESSION=>'GZ'},{NUMREGIONS=>6, SPLITALGO=>'HexStringSplit'}
  • 2- 編寫Flink代碼完成寫入Hbase操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

// 基于Flink消費(fèi)Pulsar數(shù)據(jù), 然后將數(shù)據(jù)灌入到HBase中, 完成數(shù)據(jù)備份, 以及后續(xù)即席查詢和離線分析
public class ItcastFlinkToHBase {

    public static void main(String[] args) throws Exception {

        //1. 創(chuàng)建Flinnk流式處理核心環(huán)境類對(duì)象 和 Table API 核心環(huán)境類對(duì)象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2. 添加Source組件, 從Pulsar中讀取消息數(shù)據(jù)
        Properties props = new Properties();
        props.setProperty("topic","persistent://public/default/itcast_ems_tab");
        props.setProperty("partition.discovery.interval-millis","5000");
        FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>(
                "pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",
                JsonDeser.of(PulsarTopicPojo.class),props);
        //2.1 設(shè)置pulsarSource組件在消費(fèi)數(shù)據(jù)的時(shí)候, 默認(rèn)從什么位置開始消費(fèi)
        pulsarSource.setStartFromLatest();

        DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);


        //2.2 轉(zhuǎn)換為Flink Table

        Schema schema = Schema.newBuilder()
                .column("id", DataTypes.INT())
                .column("sid", DataTypes.STRING())
                .column("ip", DataTypes.STRING())
                .column("session_id", DataTypes.STRING())
                .column("create_time", DataTypes.STRING())
                .column("yearInfo", DataTypes.STRING())
                .column("monthInfo", DataTypes.STRING())
                .column("dayInfo", DataTypes.STRING())
                .column("hourInfo", DataTypes.STRING())
                .column("seo_source", DataTypes.STRING())
                .column("area", DataTypes.STRING())
                .column("origin_channel", DataTypes.STRING())
                .column("msg_count", DataTypes.INT())
                .column("from_url", DataTypes.STRING())
                .build();


        tableEnv.createTemporaryView("itcast_ems",dataStreamSource,schema);


        //2.3: 定義HBase的目標(biāo)表
        String hTable = "create table itcast_h_ems("+
                "rowkey int,"+
                "f1 ROW<sid STRING,ip STRING,session_id STRING,create_time STRING,yearInfo STRING,monthInfo STRING,dayInfo STRING,hourInfo STRING,seo_source STRING,area STRING,origin_channel STRING,msg_count INT,from_url STRING>,"+
                "primary key(rowkey) NOT ENFORCED" +
                ") WITH ("+
                "'connector'='hbase-2.2',"+
                "'table-name'='itcast_h_ems',"+
                "'zookeeper.quorum'='node1:2181,node2:2181,node3:2181'"+
                ")";
        //4. 執(zhí)行操作
        tableEnv.executeSql(hTable);

        tableEnv.executeSql("insert into itcast_h_ems select id,ROW(sid,ip,session_id,create_time,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) from itcast_ems");

    }

}

PulsarTopicPojo文章來源地址http://www.zghlxwxcb.cn/news/detail-635199.html

public class PulsarTopicPojo {
    private Integer id;
    private String sid;
    private String ip;
    private String session_id;
    private String create_time;
    private String yearInfo;
    private String monthInfo;
    private String dayInfo;
    private String hourInfo;
    private String seo_source;
    private String area;
    private String origin_channel;
    private Integer msg_count;
    private  String from_url;

    public PulsarTopicPojo() {
    }

    public PulsarTopicPojo(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {
        this.id = id;
        this.sid = sid;
        this.ip = ip;
        this.session_id = session_id;
        this.create_time = create_time;
        this.yearInfo = yearInfo;
        this.monthInfo = monthInfo;
        this.dayInfo = dayInfo;
        this.hourInfo = hourInfo;
        this.seo_source = seo_source;
        this.area = area;
        this.origin_channel = origin_channel;
        this.msg_count = msg_count;
        this.from_url = from_url;
    }

    public void setData(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {
        this.id = id;
        this.sid = sid;
        this.ip = ip;
        this.session_id = session_id;
        this.create_time = create_time;
        this.yearInfo = yearInfo;
        this.monthInfo = monthInfo;
        this.dayInfo = dayInfo;
        this.hourInfo = hourInfo;
        this.seo_source = seo_source;
        this.area = area;
        this.origin_channel = origin_channel;
        this.msg_count = msg_count;
        this.from_url = from_url;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getSid() {
        return sid;
    }

    public void setSid(String sid) {
        this.sid = sid;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getSession_id() {
        return session_id;
    }

    public void setSession_id(String session_id) {
        this.session_id = session_id;
    }
    public String getCreate_time() {
        return create_time;
    }

    public void setCreate_time(String create_time) {
        this.create_time = create_time;
    }
    public String getYearInfo() {
        return yearInfo;
    }

    public void setYearInfo(String yearInfo) {
        this.yearInfo = yearInfo;
    }

    public String getMonthInfo() {
        return monthInfo;
    }

    public void setMonthInfo(String monthInfo) {
        this.monthInfo = monthInfo;
    }

    public String getDayInfo() {
        return dayInfo;
    }

    public void setDayInfo(String dayInfo) {
        this.dayInfo = dayInfo;
    }

    public String getHourInfo() {
        return hourInfo;
    }

    public void setHourInfo(String hourInfo) {
        this.hourInfo = hourInfo;
    }

    public String getSeo_source() {
        return seo_source;
    }

    public void setSeo_source(String seo_source) {
        this.seo_source = seo_source;
    }

    public String getArea() {
        return area;
    }

    public void setArea(String area) {
        this.area = area;
    }

    public String getOrigin_channel() {
        return origin_channel;
    }

    public void setOrigin_channel(String origin_channel) {
        this.origin_channel = origin_channel;
    }

    public Integer getMsg_count() {
        return msg_count;
    }

    public void setMsg_count(Integer msg_count) {
        this.msg_count = msg_count;
    }

    public String getFrom_url() {
        return from_url;
    }

    public void setFrom_url(String from_url) {
        this.from_url = from_url;
    }

    @Override
    public String toString() {
        return "PulsarTopicPojo{" +
                "id=" + id +
                ", sid='" + sid + '\'' +
                ", ip='" + ip + '\'' +
                ", session_id='" + session_id + '\'' +
                ", create_time='" + create_time + '\'' +
                ", yearInfo='" + yearInfo + '\'' +
                ", monthInfo='" + monthInfo + '\'' +
                ", dayInfo='" + dayInfo + '\'' +
                ", hourInfo='" + hourInfo + '\'' +
                ", seo_source='" + seo_source + '\'' +
                ", area='" + area + '\'' +
                ", origin_channel='" + origin_channel + '\'' +
                ", msg_count=" + msg_count +
                ", from_url='" + from_url + '\'' +
                '}';
    }
}

到了這里,關(guān)于14_基于Flink將pulsar數(shù)據(jù)寫入到HBase的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Apache-Pulsar安裝操作說明

    Apache-Pulsar安裝操作說明

    Pulsar 是一種用于服務(wù)器到服務(wù)器消息傳遞的多租戶高性能解決方案。 Pulsar 的主要特性如下: 對(duì) Pulsar 實(shí)例中的多個(gè)集群的本機(jī)支持,并跨集群無縫地復(fù)制消息。 極低的發(fā)布和端到端延遲。 無縫可擴(kuò)展至超過一百萬個(gè)主題。 一個(gè)簡(jiǎn)單的客戶端 API,具有Java、Go、Python和C++的綁

    2024年04月14日
    瀏覽(27)
  • Pulsar's Integration with Apache Samza for Stateful Stream Processing

    隨著數(shù)據(jù)的增長(zhǎng)和復(fù)雜性,流處理技術(shù)變得越來越重要。流處理系統(tǒng)允許實(shí)時(shí)分析大規(guī)模的、高速變化的數(shù)據(jù)流。Apache Pulsar 是一個(gè)高性能的分布式消息系統(tǒng),適用于流處理和批處理。Apache Samza 是一個(gè)用于有狀態(tài)流處理的系統(tǒng),它可以與 Pulsar 集成,以實(shí)現(xiàn)高效的狀態(tài)流處理

    2024年04月14日
    瀏覽(26)
  • Apache Pulsar 技術(shù)系列 - GEO replication 中訂閱狀態(tài)的同步原理

    Apache Pulsar 技術(shù)系列 - GEO replication 中訂閱狀態(tài)的同步原理

    Apache Pulsar 是一個(gè)多租戶、高性能的服務(wù)間消息傳輸解決方案,支持多租戶、低延時(shí)、讀寫分離、跨地域復(fù)制(GEO Replication)、快速擴(kuò)容、靈活容錯(cuò)等特性,GEO Replication 可以原生支持?jǐn)?shù)據(jù)和訂閱狀態(tài)在多個(gè)集群之間進(jìn)行復(fù)制,GEO 目前在 Apache InLong 內(nèi)部已經(jīng)有長(zhǎng)期穩(wěn)定的實(shí)踐,

    2024年02月16日
    瀏覽(20)
  • 消息隊(duì)列之六脈神劍:RabbitMQ、Kafka、ActiveMQ 、Redis、 ZeroMQ、Apache Pulsar對(duì)比和如何使用

    消息隊(duì)列(Message Queue)是一種異步通信機(jī)制,它將消息發(fā)送者和接收者解耦,從而提高了應(yīng)用程序的性能、可擴(kuò)展性和可靠性。在分布式系統(tǒng)中,消息隊(duì)列經(jīng)常被用于處理高并發(fā)、異步處理、應(yīng)用解耦等場(chǎng)景。 本篇回答將分析比較常見的六種消息隊(duì)列:RabbitMQ、Kafka、Active

    2024年02月14日
    瀏覽(19)
  • 與ChatGPT淺聊Pulsar

    我 : 艾米麗,談一談你對(duì)Pulsar的理解? ChatGPT : 當(dāng)然可以!Apache Pulsar是一款分布式消息中間件,它支持多種消息模式,包括發(fā)布/訂閱模式、隊(duì)列模式和流模式。在發(fā)布/訂閱模式下,消息發(fā)布者將消息發(fā)布到一個(gè)主題中,訂閱者可以訂閱該主題,并接收到所有發(fā)布到該主題

    2023年04月15日
    瀏覽(20)
  • Pulsar-架構(gòu)與設(shè)計(jì)

    Pulsar-架構(gòu)與設(shè)計(jì)

    隨著云原生的興起,對(duì)消息中間件的伸縮性和多租戶隔離有了更高的要求。現(xiàn)有的消息中間件不支持多租戶的隔離,但是有一定伸縮性,需要一定的遷移工具支持和手工操作。 Pulsar是下一代云原生分布式消息平臺(tái),采用存儲(chǔ)和計(jì)算分離架構(gòu)設(shè)計(jì),支持彈性伸縮,支持多租戶、

    2024年02月22日
    瀏覽(20)
  • Pulsar的消費(fèi)模式

    Pulsar 提供了三種消費(fèi)模式:獨(dú)立消費(fèi)者模式、共享訂閱模式和發(fā)布訂閱模式。 1: 獨(dú)立消費(fèi)者模式:每個(gè)消費(fèi)者實(shí)例都會(huì)獨(dú)立地消費(fèi)消息,并且每個(gè)消息只會(huì)被一個(gè)消費(fèi)者消費(fèi)。這種模式適合于需要完全獨(dú)立處理消息的場(chǎng)景,例如數(shù)據(jù)采集和日志處理。 在這個(gè)示例中,我們創(chuàng)

    2024年02月11日
    瀏覽(15)
  • Pulsar消息發(fā)送、消費(fèi)架構(gòu)概述

    Pulsar消息發(fā)送、消費(fèi)架構(gòu)概述

    大家好,我是威哥,《RocketMQ技術(shù)內(nèi)幕》、《RocketMQ實(shí)戰(zhàn)》作者、RocketMQ社區(qū)首席布道師、極客時(shí)間《中間件核心技術(shù)與實(shí)戰(zhàn)》專欄作者、中通快遞基礎(chǔ)架構(gòu)資深架構(gòu)師, 越努力越幸運(yùn),唯有堅(jiān)持不懈 ,與大家共勉。 Pulsar基于發(fā)布-訂閱模式,消息發(fā)送者向主題發(fā)送消息,而消

    2024年02月09日
    瀏覽(20)
  • pulsar集群搭建_親測(cè)成功

    pulsar集群搭建_親測(cè)成功 單機(jī)運(yùn)行請(qǐng)看: Linux MacBook單機(jī)部署Pulsar并開啟認(rèn)證功能 集群組成 搭建 Pulsar 集群至少需要 3 個(gè)組件:ZooKeeper 集群、BookKeeper 集群和 broker 集群(Broker 是 Pulsar 的自身實(shí)例)。這三個(gè)集群組件如下: ZooKeeper 集群(3(或多) 個(gè) ZooKeeper 節(jié)點(diǎn)組成) bookie 集群(

    2024年02月09日
    瀏覽(14)
  • HBase Shell操作&Flink寫入HBase

    1)進(jìn)入HBase客戶端命令行 2)查看幫助命令 3)查看當(dāng)前數(shù)據(jù)庫(kù)中有哪些表 1)創(chuàng)建表 2)插入數(shù)據(jù)到表 3)掃描查看表數(shù)據(jù) 4)查看表結(jié)構(gòu) 5)更新指定字段的數(shù)據(jù) 6)查看“指定行”或“指定列族:列”的數(shù)據(jù) 7)統(tǒng)計(jì)表數(shù)據(jù)行數(shù) 8)刪除數(shù)據(jù) 9)清空表數(shù)據(jù) 10)刪除表 11)變更

    2024年02月04日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包