国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse

這篇具有很好參考價值的文章主要介紹了15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

3.8.基于Flink將數(shù)據(jù)寫入到ClickHouse

編寫Flink完成數(shù)據(jù)寫入到ClickHouse操作, 后續(xù)基于CK完成指標統(tǒng)計操作

3.8.1.ClickHouse基本介紹

ClickHouse 是俄羅斯的Yandex于2016年開源的列式存儲數(shù)據(jù)庫(DBMS),使用C++語言編寫,主要用于在線分析處理查詢(OLAP),能夠使用SQL查詢實時生成分析數(shù)據(jù)報告。
15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse,# Apache Pulsar,pulsar
結(jié)論: ClickHouse像很多OLAP數(shù)據(jù)庫一樣,單表查詢速度由于關(guān)聯(lián)查詢,而且ClickHouse的兩者差距更為明顯。

3.8.2.ClickHouse安裝步驟

本項目中,我們僅需要安裝單機測試版本即可使用(node2安裝), 在實際生產(chǎn)中, 大家可以直接將分布式集群版本

  • 1-設(shè)置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
  • 2- 直接基于yum安裝即可
sudo yum install clickhouse-server clickhouse-client
  • 3-修改配置文件
vim /etc/clickhouse-server/config.xml 
修改178行: 打開這一行的注釋 
<listen_host>::</listen_host>

15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse,# Apache Pulsar,pulsar

  • 4-啟動clickhouse的server
systemctl start clickhouse-server 
停止:
systemctl stop clickhouse-server 
重啟
systemctl restart clickhouse-server
  • 5-進入客戶端
    15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse,# Apache Pulsar,pulsar

3.8.3.在ClickHouse中創(chuàng)建目標表

create database itcast_ck; 
use itcast_ck; 
create table itcast_ck.itcast_ck_ems( 
id int, 
sid varchar(128), 
ip varchar(128), 
create_time varchar(128), 
session_id varchar(128), 
yearInfo varchar(128), 
monthInfo varchar(128), 
dayInfo varchar(128), 
hourInfo varchar(128), 
seo_source varchar(128), 
area varchar(128), 
origin_channel varchar(128), 
msg_count int(128), 
from_url varchar(128), 
PRIMARY KEY (`id`) 
) ENGINE=ReplacingMergeTree();

3.8.4.編寫Flink代碼完成寫入到CK操作

import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;

import java.sql.Types;
import java.util.Properties;

// 基于Flink完成讀取Pulsar中數(shù)據(jù)將消息數(shù)據(jù)寫入到clickhouse中
public class ItcastFlinkToClickHouse {

    public static void main(String[] args) throws Exception {
        //1. 創(chuàng)建Flinnk流式處理核心環(huán)境類對象 和 Table API 核心環(huán)境類對象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2. 添加Source組件, 從Pulsar中讀取消息數(shù)據(jù)
        Properties props = new Properties();
        props.setProperty("topic","persistent://public/default/itcast_ems_tab");
        props.setProperty("partition.discovery.interval-millis","5000");
        FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>(
                "pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",
                JsonDeser.of(PulsarTopicPojo.class),props);
        //2.1 設(shè)置pulsarSource組件在消費數(shù)據(jù)的時候, 默認從什么位置開始消費
        pulsarSource.setStartFromLatest();

        DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);


        //2.2  轉(zhuǎn)換數(shù)據(jù)操作: 將 PulsarTopicPojo 轉(zhuǎn)換為ROW對象
        SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {
            @Override
            public Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {

                return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),
                        pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),
                        pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),
                        pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());
            }
        });


        //2.3: 設(shè)置sink操作寫入到CK操作
        String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

        JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
                .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
                .setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck")
                .setQuery(insertSql)
                .setBatchSize(1)
                .setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR)
                .build();

        tableSink.emitDataStream(rowDataSteam);


        //3. 提交執(zhí)行
        env.execute("itcast_to_ck");

    }
}

3.9.HBase對接Phoenix實現(xiàn)即席查詢

3.9.1.Phoenix安裝操作

Phoenix是屬于apache旗下的一款基于hbase的工具, 此工具提供一種全新的方式來操作hbase中數(shù)據(jù)(SQL),
同時Phoenix對hbase進行大量的優(yōu)化工作, 能夠讓我們更加有效的操作hbase

整個安裝操作, 大家可以參考資料中安裝手冊, 進行安裝即可

3.9.2.在Phoenix中創(chuàng)建表

create view "itcast_h_ems" ( 
"id" integer primary key, 
"f1"."sid" varchar, 
"f1"."ip" varchar, 
"f1"."create_time" varchar, 
"f1"."session_id" varchar, 
"f1"."yearInfo" varchar, 
"f1"."monthInfo" varchar, 
"f1"."dayInfo" varchar, 
"f1"."hourInfo" varchar, 
"f1"."seo_source" varchar, 
"f1"."area" varchar, 
"f1"."origin_channel" varchar, 
"f1"."msg_count" integer, 
"f1"."from_url" varchar 
);

3.9.3.在Phoenix中類型說明

15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse,# Apache Pulsar,pulsar文章來源地址http://www.zghlxwxcb.cn/news/detail-634454.html

到了這里,關(guān)于15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Apache-Pulsar安裝操作說明

    Apache-Pulsar安裝操作說明

    Pulsar 是一種用于服務(wù)器到服務(wù)器消息傳遞的多租戶高性能解決方案。 Pulsar 的主要特性如下: 對 Pulsar 實例中的多個集群的本機支持,并跨集群無縫地復(fù)制消息。 極低的發(fā)布和端到端延遲。 無縫可擴展至超過一百萬個主題。 一個簡單的客戶端 API,具有Java、Go、Python和C++的綁

    2024年04月14日
    瀏覽(27)
  • Pulsar's Integration with Apache Samza for Stateful Stream Processing

    隨著數(shù)據(jù)的增長和復(fù)雜性,流處理技術(shù)變得越來越重要。流處理系統(tǒng)允許實時分析大規(guī)模的、高速變化的數(shù)據(jù)流。Apache Pulsar 是一個高性能的分布式消息系統(tǒng),適用于流處理和批處理。Apache Samza 是一個用于有狀態(tài)流處理的系統(tǒng),它可以與 Pulsar 集成,以實現(xiàn)高效的狀態(tài)流處理

    2024年04月14日
    瀏覽(26)
  • Apache Pulsar 技術(shù)系列 - GEO replication 中訂閱狀態(tài)的同步原理

    Apache Pulsar 技術(shù)系列 - GEO replication 中訂閱狀態(tài)的同步原理

    Apache Pulsar 是一個多租戶、高性能的服務(wù)間消息傳輸解決方案,支持多租戶、低延時、讀寫分離、跨地域復(fù)制(GEO Replication)、快速擴容、靈活容錯等特性,GEO Replication 可以原生支持數(shù)據(jù)和訂閱狀態(tài)在多個集群之間進行復(fù)制,GEO 目前在 Apache InLong 內(nèi)部已經(jīng)有長期穩(wěn)定的實踐,

    2024年02月16日
    瀏覽(20)
  • 結(jié)合云計算的最新技術(shù)和現(xiàn)狀,介紹云計算基礎(chǔ)知識、開源分布式數(shù)據(jù)庫Clickhouse、可視化數(shù)據(jù)分析工具、分布式鏈路跟蹤系統(tǒng)Pinpoint、數(shù)據(jù)湖存儲系統(tǒng)Pulsar等

    作者:禪與計算機程序設(shè)計藝術(shù) 2019年,“云計算”將成為“經(jīng)濟全球化”的熱門詞匯之一,2020年全球云計算市場規(guī)模預(yù)計達到1萬億美元。中國是繼美國、英國之后,成為全球第四大云服務(wù)提供商。華為、騰訊、阿里巴巴等互聯(lián)網(wǎng)巨頭紛紛布局云計算領(lǐng)域,各家公司紛紛推出

    2024年02月08日
    瀏覽(30)
  • 消息隊列之六脈神劍:RabbitMQ、Kafka、ActiveMQ 、Redis、 ZeroMQ、Apache Pulsar對比和如何使用

    消息隊列(Message Queue)是一種異步通信機制,它將消息發(fā)送者和接收者解耦,從而提高了應(yīng)用程序的性能、可擴展性和可靠性。在分布式系統(tǒng)中,消息隊列經(jīng)常被用于處理高并發(fā)、異步處理、應(yīng)用解耦等場景。 本篇回答將分析比較常見的六種消息隊列:RabbitMQ、Kafka、Active

    2024年02月14日
    瀏覽(19)
  • 【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    【Flink】【ClickHouse】寫入流式數(shù)據(jù)到ClickHouse

    Flink 安裝的教程就不在這里贅敘了,可以看一下以前的文章,這篇文章主要是把流式數(shù)據(jù)寫入的OLAP(ClickHouse)中作查詢分析 Flink 1.13.2, ClickHouse?22.1.3.7 這里直接使用docker安裝,沒有安裝的同學可以使用homebreak來安裝,執(zhí)行下面的命令即可( 已經(jīng)安裝了docker的可以忽略 ) 四指

    2024年02月03日
    瀏覽(26)
  • Flink寫入數(shù)據(jù)到ClickHouse

    Flink寫入數(shù)據(jù)到ClickHouse

    1.ClickHouse建表 ClickHouse中建表 2.ClickHouse依賴 Flink開發(fā)相關(guān)依賴 3.Bean實體類 User.java 4.ClickHouse業(yè)務(wù)寫入邏輯 ClickHouseSinkFunction.java open():在SinkFunction實例化后調(diào)用,用于初始化連接或資源。這在處理每個并行任務(wù)的子任務(wù)之前只被調(diào)用一次。 invoke():定義了在每個元素到達Sink操

    2024年02月12日
    瀏覽(13)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 實現(xiàn) Kafka 數(shù)據(jù)寫入 ClickHouse

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 ClickHouse。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進行動態(tài)讀取。 3、此案例中的 Kafka 是進行了 Kerberos 安全認證的,如果不需要自行修改。 4、先在 ClickHouse 中創(chuàng)建表然后動態(tài)獲取 ClickHouse 的表結(jié)構(gòu)。 5、Kafka 數(shù)據(jù)為 Json 格式,通過 FlatMap 扁平

    2024年02月03日
    瀏覽(23)
  • 與ChatGPT淺聊Pulsar

    我 : 艾米麗,談一談你對Pulsar的理解? ChatGPT : 當然可以!Apache Pulsar是一款分布式消息中間件,它支持多種消息模式,包括發(fā)布/訂閱模式、隊列模式和流模式。在發(fā)布/訂閱模式下,消息發(fā)布者將消息發(fā)布到一個主題中,訂閱者可以訂閱該主題,并接收到所有發(fā)布到該主題

    2023年04月15日
    瀏覽(20)
  • Pulsar-架構(gòu)與設(shè)計

    Pulsar-架構(gòu)與設(shè)計

    隨著云原生的興起,對消息中間件的伸縮性和多租戶隔離有了更高的要求。現(xiàn)有的消息中間件不支持多租戶的隔離,但是有一定伸縮性,需要一定的遷移工具支持和手工操作。 Pulsar是下一代云原生分布式消息平臺,采用存儲和計算分離架構(gòu)設(shè)計,支持彈性伸縮,支持多租戶、

    2024年02月22日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包