3.8.基于Flink將數(shù)據(jù)寫入到ClickHouse
編寫Flink完成數(shù)據(jù)寫入到ClickHouse操作, 后續(xù)基于CK完成指標統(tǒng)計操作
3.8.1.ClickHouse基本介紹
ClickHouse 是俄羅斯的Yandex于2016年開源的列式存儲數(shù)據(jù)庫(DBMS),使用C++語言編寫,主要用于在線分析處理查詢(OLAP),能夠使用SQL查詢實時生成分析數(shù)據(jù)報告。
結(jié)論: ClickHouse像很多OLAP數(shù)據(jù)庫一樣,單表查詢速度由于關(guān)聯(lián)查詢,而且ClickHouse的兩者差距更為明顯。
3.8.2.ClickHouse安裝步驟
本項目中,我們僅需要安裝單機測試版本即可使用(node2安裝), 在實際生產(chǎn)中, 大家可以直接將分布式集群版本
- 1-設(shè)置yum源
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
- 2- 直接基于yum安裝即可
sudo yum install clickhouse-server clickhouse-client
- 3-修改配置文件
vim /etc/clickhouse-server/config.xml
修改178行: 打開這一行的注釋
<listen_host>::</listen_host>
- 4-啟動clickhouse的server
systemctl start clickhouse-server
停止:
systemctl stop clickhouse-server
重啟
systemctl restart clickhouse-server
- 5-進入客戶端
3.8.3.在ClickHouse中創(chuàng)建目標表
create database itcast_ck;
use itcast_ck;
create table itcast_ck.itcast_ck_ems(
id int,
sid varchar(128),
ip varchar(128),
create_time varchar(128),
session_id varchar(128),
yearInfo varchar(128),
monthInfo varchar(128),
dayInfo varchar(128),
hourInfo varchar(128),
seo_source varchar(128),
area varchar(128),
origin_channel varchar(128),
msg_count int(128),
from_url varchar(128),
PRIMARY KEY (`id`)
) ENGINE=ReplacingMergeTree();
3.8.4.編寫Flink代碼完成寫入到CK操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.types.Row;
import java.sql.Types;
import java.util.Properties;
// 基于Flink完成讀取Pulsar中數(shù)據(jù)將消息數(shù)據(jù)寫入到clickhouse中
public class ItcastFlinkToClickHouse {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建Flinnk流式處理核心環(huán)境類對象 和 Table API 核心環(huán)境類對象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 添加Source組件, 從Pulsar中讀取消息數(shù)據(jù)
Properties props = new Properties();
props.setProperty("topic","persistent://public/default/itcast_ems_tab");
props.setProperty("partition.discovery.interval-millis","5000");
FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>(
"pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",
JsonDeser.of(PulsarTopicPojo.class),props);
//2.1 設(shè)置pulsarSource組件在消費數(shù)據(jù)的時候, 默認從什么位置開始消費
pulsarSource.setStartFromLatest();
DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);
//2.2 轉(zhuǎn)換數(shù)據(jù)操作: 將 PulsarTopicPojo 轉(zhuǎn)換為ROW對象
SingleOutputStreamOperator<Row> rowDataSteam = dataStreamSource.map(new MapFunction<PulsarTopicPojo, Row>() {
@Override
public Row map(PulsarTopicPojo pulsarTopicPojo) throws Exception {
return Row.of(pulsarTopicPojo.getId(), pulsarTopicPojo.getSid(), pulsarTopicPojo.getIp(), pulsarTopicPojo.getCreate_time(),
pulsarTopicPojo.getSession_id(), pulsarTopicPojo.getYearInfo(), pulsarTopicPojo.getMonthInfo(), pulsarTopicPojo.getDayInfo(),
pulsarTopicPojo.getHourInfo(), pulsarTopicPojo.getSeo_source(), pulsarTopicPojo.getArea(), pulsarTopicPojo.getOrigin_channel(),
pulsarTopicPojo.getMsg_count(), pulsarTopicPojo.getFrom_url());
}
});
//2.3: 設(shè)置sink操作寫入到CK操作
String insertSql = "insert into itcast_ck.itcast_ck_ems (id,sid,ip,create_time,session_id,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node2:8123/itcast_ck")
.setQuery(insertSql)
.setBatchSize(1)
.setParameterTypes(Types.INTEGER,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.VARCHAR,Types.INTEGER,Types.VARCHAR)
.build();
tableSink.emitDataStream(rowDataSteam);
//3. 提交執(zhí)行
env.execute("itcast_to_ck");
}
}
3.9.HBase對接Phoenix實現(xiàn)即席查詢
3.9.1.Phoenix安裝操作
Phoenix是屬于apache旗下的一款基于hbase的工具, 此工具提供一種全新的方式來操作hbase中數(shù)據(jù)(SQL),
同時Phoenix對hbase進行大量的優(yōu)化工作, 能夠讓我們更加有效的操作hbase
整個安裝操作, 大家可以參考資料中安裝手冊, 進行安裝即可文章來源:http://www.zghlxwxcb.cn/news/detail-634454.html
3.9.2.在Phoenix中創(chuàng)建表
create view "itcast_h_ems" (
"id" integer primary key,
"f1"."sid" varchar,
"f1"."ip" varchar,
"f1"."create_time" varchar,
"f1"."session_id" varchar,
"f1"."yearInfo" varchar,
"f1"."monthInfo" varchar,
"f1"."dayInfo" varchar,
"f1"."hourInfo" varchar,
"f1"."seo_source" varchar,
"f1"."area" varchar,
"f1"."origin_channel" varchar,
"f1"."msg_count" integer,
"f1"."from_url" varchar
);
3.9.3.在Phoenix中類型說明
文章來源地址http://www.zghlxwxcb.cn/news/detail-634454.html
到了這里,關(guān)于15_基于Flink將pulsar數(shù)據(jù)寫入到ClickHouse的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!