3.7.基于Flink將數(shù)據(jù)寫入到HBase
3.7.1.編寫Flink完成數(shù)據(jù)寫入到Hbase操作, 完成數(shù)據(jù)備份, 便于后續(xù)進(jìn)行即席查詢和離線分析
3.7.1.1.HBase基本介紹
hbase是基于Google發(fā)布bigTable論文產(chǎn)生一款軟件, 是一款noSQL型數(shù)據(jù), 不支持SQL. 不支持join的操作, 沒有表關(guān)系, 不支持事務(wù)(多行事務(wù)),hbase是基于 HDFS的采用java 語(yǔ)言編寫
查詢hbase數(shù)據(jù)一般有三種方案(主鍵(row key)查詢, 主鍵的范圍檢索,查詢?nèi)繑?shù)據(jù))
都是以字節(jié)類型存儲(chǔ),存儲(chǔ)結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)。
hbase表的特點(diǎn): 大 面向列的存儲(chǔ)方案 稀疏性
2.7.1.2.應(yīng)用場(chǎng)景
1)需要進(jìn)行隨機(jī)讀寫的操作。
2)數(shù)據(jù)量比較大。
3)數(shù)據(jù)比較稀疏。
2.7.1.3.HBase安裝操作
本次安裝的HBase為2.2.7,詳細(xì)的安裝手冊(cè)大家可以參考資料, 還需要大家注意,HBase的啟動(dòng)需要依賴于zookeeper
和HDFS的, 顧需要先安裝 HADOOP與zookeeper文章來源:http://www.zghlxwxcb.cn/news/detail-635199.html
- 1-在Hbase中創(chuàng)建目標(biāo)表
create 'itcast_h_ems, {NAME=>'f1',COMPRESSION=>'GZ'},{NUMREGIONS=>6, SPLITALGO=>'HexStringSplit'}
- 2- 編寫Flink代碼完成寫入Hbase操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
// 基于Flink消費(fèi)Pulsar數(shù)據(jù), 然后將數(shù)據(jù)灌入到HBase中, 完成數(shù)據(jù)備份, 以及后續(xù)即席查詢和離線分析
public class ItcastFlinkToHBase {
public static void main(String[] args) throws Exception {
//1. 創(chuàng)建Flinnk流式處理核心環(huán)境類對(duì)象 和 Table API 核心環(huán)境類對(duì)象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. 添加Source組件, 從Pulsar中讀取消息數(shù)據(jù)
Properties props = new Properties();
props.setProperty("topic","persistent://public/default/itcast_ems_tab");
props.setProperty("partition.discovery.interval-millis","5000");
FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>(
"pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",
JsonDeser.of(PulsarTopicPojo.class),props);
//2.1 設(shè)置pulsarSource組件在消費(fèi)數(shù)據(jù)的時(shí)候, 默認(rèn)從什么位置開始消費(fèi)
pulsarSource.setStartFromLatest();
DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);
//2.2 轉(zhuǎn)換為Flink Table
Schema schema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("sid", DataTypes.STRING())
.column("ip", DataTypes.STRING())
.column("session_id", DataTypes.STRING())
.column("create_time", DataTypes.STRING())
.column("yearInfo", DataTypes.STRING())
.column("monthInfo", DataTypes.STRING())
.column("dayInfo", DataTypes.STRING())
.column("hourInfo", DataTypes.STRING())
.column("seo_source", DataTypes.STRING())
.column("area", DataTypes.STRING())
.column("origin_channel", DataTypes.STRING())
.column("msg_count", DataTypes.INT())
.column("from_url", DataTypes.STRING())
.build();
tableEnv.createTemporaryView("itcast_ems",dataStreamSource,schema);
//2.3: 定義HBase的目標(biāo)表
String hTable = "create table itcast_h_ems("+
"rowkey int,"+
"f1 ROW<sid STRING,ip STRING,session_id STRING,create_time STRING,yearInfo STRING,monthInfo STRING,dayInfo STRING,hourInfo STRING,seo_source STRING,area STRING,origin_channel STRING,msg_count INT,from_url STRING>,"+
"primary key(rowkey) NOT ENFORCED" +
") WITH ("+
"'connector'='hbase-2.2',"+
"'table-name'='itcast_h_ems',"+
"'zookeeper.quorum'='node1:2181,node2:2181,node3:2181'"+
")";
//4. 執(zhí)行操作
tableEnv.executeSql(hTable);
tableEnv.executeSql("insert into itcast_h_ems select id,ROW(sid,ip,session_id,create_time,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) from itcast_ems");
}
}
PulsarTopicPojo文章來源地址http://www.zghlxwxcb.cn/news/detail-635199.html
public class PulsarTopicPojo {
private Integer id;
private String sid;
private String ip;
private String session_id;
private String create_time;
private String yearInfo;
private String monthInfo;
private String dayInfo;
private String hourInfo;
private String seo_source;
private String area;
private String origin_channel;
private Integer msg_count;
private String from_url;
public PulsarTopicPojo() {
}
public PulsarTopicPojo(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {
this.id = id;
this.sid = sid;
this.ip = ip;
this.session_id = session_id;
this.create_time = create_time;
this.yearInfo = yearInfo;
this.monthInfo = monthInfo;
this.dayInfo = dayInfo;
this.hourInfo = hourInfo;
this.seo_source = seo_source;
this.area = area;
this.origin_channel = origin_channel;
this.msg_count = msg_count;
this.from_url = from_url;
}
public void setData(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {
this.id = id;
this.sid = sid;
this.ip = ip;
this.session_id = session_id;
this.create_time = create_time;
this.yearInfo = yearInfo;
this.monthInfo = monthInfo;
this.dayInfo = dayInfo;
this.hourInfo = hourInfo;
this.seo_source = seo_source;
this.area = area;
this.origin_channel = origin_channel;
this.msg_count = msg_count;
this.from_url = from_url;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getSid() {
return sid;
}
public void setSid(String sid) {
this.sid = sid;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getSession_id() {
return session_id;
}
public void setSession_id(String session_id) {
this.session_id = session_id;
}
public String getCreate_time() {
return create_time;
}
public void setCreate_time(String create_time) {
this.create_time = create_time;
}
public String getYearInfo() {
return yearInfo;
}
public void setYearInfo(String yearInfo) {
this.yearInfo = yearInfo;
}
public String getMonthInfo() {
return monthInfo;
}
public void setMonthInfo(String monthInfo) {
this.monthInfo = monthInfo;
}
public String getDayInfo() {
return dayInfo;
}
public void setDayInfo(String dayInfo) {
this.dayInfo = dayInfo;
}
public String getHourInfo() {
return hourInfo;
}
public void setHourInfo(String hourInfo) {
this.hourInfo = hourInfo;
}
public String getSeo_source() {
return seo_source;
}
public void setSeo_source(String seo_source) {
this.seo_source = seo_source;
}
public String getArea() {
return area;
}
public void setArea(String area) {
this.area = area;
}
public String getOrigin_channel() {
return origin_channel;
}
public void setOrigin_channel(String origin_channel) {
this.origin_channel = origin_channel;
}
public Integer getMsg_count() {
return msg_count;
}
public void setMsg_count(Integer msg_count) {
this.msg_count = msg_count;
}
public String getFrom_url() {
return from_url;
}
public void setFrom_url(String from_url) {
this.from_url = from_url;
}
@Override
public String toString() {
return "PulsarTopicPojo{" +
"id=" + id +
", sid='" + sid + '\'' +
", ip='" + ip + '\'' +
", session_id='" + session_id + '\'' +
", create_time='" + create_time + '\'' +
", yearInfo='" + yearInfo + '\'' +
", monthInfo='" + monthInfo + '\'' +
", dayInfo='" + dayInfo + '\'' +
", hourInfo='" + hourInfo + '\'' +
", seo_source='" + seo_source + '\'' +
", area='" + area + '\'' +
", origin_channel='" + origin_channel + '\'' +
", msg_count=" + msg_count +
", from_url='" + from_url + '\'' +
'}';
}
}
到了這里,關(guān)于14_基于Flink將pulsar數(shù)據(jù)寫入到HBase的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!