04:數(shù)據(jù)源
-
目標(biāo):了解數(shù)據(jù)源的格式及實(shí)現(xiàn)模擬數(shù)據(jù)的生成
-
路徑
- step1:數(shù)據(jù)格式
- step2:數(shù)據(jù)生成
-
實(shí)施
-
數(shù)據(jù)格式
消息時(shí)間 發(fā)件人昵稱 發(fā)件人賬號(hào) 發(fā)件人性別 發(fā)件人IP 發(fā)件人系統(tǒng) 發(fā)件人手機(jī)型號(hào) 發(fā)件人網(wǎng)絡(luò)制式 發(fā)件人GPS 收件人昵稱 收件人IP 收件人賬號(hào) 收件人系統(tǒng) 收件人手機(jī)型號(hào) 收件人網(wǎng)絡(luò)制式 收件人GPS 收件人性別 消息類型 雙方距離 消息 msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message 2020/05/08 15:11:33 古博易 14747877194 男 48.147.134.255 Android 8.0 小米 Redmi K30 4G 94.704577,36.247553 萊優(yōu) 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 女 TEXT 77.82KM 天涯海角惆悵渡,牛郎織女隔天河。佛祖座前長頓首,只求共度一百年。 -
數(shù)據(jù)生成
-
創(chuàng)建原始文件目錄
mkdir /export/data/momo_init
-
上傳模擬數(shù)據(jù)程序
cd /export/data/momo_init rz
-
創(chuàng)建模擬數(shù)據(jù)目錄
mkdir /export/data/momo_data
-
運(yùn)行程序生成數(shù)據(jù)
-
語法
java -jar /export/data/momo_init/MoMo_DataGen.jar 原始數(shù)據(jù)路徑 模擬數(shù)據(jù)路徑 隨機(jī)產(chǎn)生數(shù)據(jù)間隔ms時(shí)間
-
測(cè)試:每500ms生成一條數(shù)據(jù)
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 500
-
結(jié)果:生成模擬數(shù)據(jù)文件MOMO_DATA.dat,并且每條數(shù)據(jù)中字段分隔符為\001
-
-
-
-
小結(jié)文章來源地址http://www.zghlxwxcb.cn/news/detail-765674.html
- 了解數(shù)據(jù)源的格式及實(shí)現(xiàn)模擬數(shù)據(jù)的生成
05:技術(shù)架構(gòu)及技術(shù)選型
-
目標(biāo):掌握實(shí)時(shí)案例的技術(shù)架構(gòu)及技術(shù)選型
-
路徑
- step1:需求分析
- step2:技術(shù)選型
- step3:技術(shù)架構(gòu)
-
實(shí)施
-
需求分析
- 離線存儲(chǔ)計(jì)算
- 提供離線T + 1的統(tǒng)計(jì)分析
- 提供離線數(shù)據(jù)的即時(shí)查詢
- 實(shí)時(shí)存儲(chǔ)計(jì)算
- 提供實(shí)時(shí)統(tǒng)計(jì)分析
- 離線存儲(chǔ)計(jì)算
-
技術(shù)選型
- 離線
- 數(shù)據(jù)采集:Flume
- 離線存儲(chǔ):Hbase
- 離線分析:Hive:復(fù)雜計(jì)算
- 即時(shí)查詢:Phoenix:高效查詢
- 實(shí)時(shí)
- 數(shù)據(jù)采集:Flume
- 實(shí)時(shí)存儲(chǔ):Kafka
- 實(shí)時(shí)計(jì)算:Flink
- 實(shí)時(shí)應(yīng)用:MySQL + FineBI 或者 Redis + JavaWeb可視化
- 離線
-
技術(shù)架構(gòu)
- 為什么不直接將Flume的數(shù)據(jù)給Hbase,而統(tǒng)一的給了Kafka,再由Kafka到Hbase?
- 避免高并發(fā)寫導(dǎo)致機(jī)器負(fù)載過高、實(shí)現(xiàn)架構(gòu)解耦、實(shí)現(xiàn)異步高效
- 保證數(shù)據(jù)一致性
- 為什么不直接將Flume的數(shù)據(jù)給Hbase,而統(tǒng)一的給了Kafka,再由Kafka到Hbase?
-
-
小結(jié)
- 掌握實(shí)時(shí)案例的技術(shù)架構(gòu)及技術(shù)選型
06:Flume的回顧及安裝
-
目標(biāo):回顧Flume基本使用及實(shí)現(xiàn)Flume的安裝測(cè)試
-
路徑
- step1:Flume回顧
- step2:Flume的安裝
- step3:Flume的測(cè)試
-
實(shí)施
-
Flume的回顧
- 功能:實(shí)時(shí)對(duì)文件或者網(wǎng)絡(luò)端口進(jìn)行數(shù)據(jù)流監(jiān)聽采集
- 場景:文件實(shí)時(shí)采集
- 開發(fā)
- step1:先開發(fā)一個(gè)配置文件:properties【K=V】
- step2:運(yùn)行這個(gè)文件即可
- 組成
- Agent:一個(gè)Agent就是一個(gè)Flume程序
- Source:負(fù)責(zé)監(jiān)聽數(shù)據(jù)源,將數(shù)據(jù)源的動(dòng)態(tài)數(shù)據(jù)變成每一條Event數(shù)據(jù),將Event數(shù)據(jù)流放入Channel
- Channel:負(fù)責(zé)臨時(shí)存儲(chǔ)Source發(fā)送過來的數(shù)據(jù),供Sink來取數(shù)據(jù)
- Sink:負(fù)責(zé)從Channel拉取數(shù)據(jù)寫入目標(biāo)地
- Event:代表一條數(shù)據(jù)對(duì)象
- head:Map集合[KV]
- body:byte[]
-
Flume的安裝
-
上傳安裝包
cd /export/software/ rz
-
解壓安裝
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/ cd /export/server mv apache-flume-1.9.0-bin flume-1.9.0-bin
-
修改配置
#集成HDFS,拷貝HDFS配置文件 cd /export/server/flume-1.9.0-bin cp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/ #修改Flume環(huán)境變量 cd /export/server/flume-1.9.0-bin/conf/ mv flume-env.sh.template flume-env.sh vim flume-env.sh
#修改22行 export JAVA_HOME=/export/server/jdk1.8.0_65 #修改34行 export HADOOP_HOME=/export/server/hadoop-3.3.0
-
刪除Flume自帶的guava包,替換成Hadoop的
cd /export/server/flume-1.9.0-bin rm -rf lib/guava-11.0.2.jar cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
-
創(chuàng)建目錄
cd /export/server/flume-1.9.0-bin #程序配置文件存儲(chǔ)目錄 mkdir usercase #Taildir元數(shù)據(jù)存儲(chǔ)目錄 mkdir position
-
-
Flume的測(cè)試
-
需求:采集聊天數(shù)據(jù),寫入HDFS
-
分析
- Source:taildir:動(dòng)態(tài)監(jiān)聽多個(gè)文件實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)采集
- Channel:mem:將數(shù)據(jù)緩存在內(nèi)存
- Sink:hdfs
-
開發(fā)
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #指定一個(gè)元數(shù)據(jù)記錄文件 a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json #將所有需要監(jiān)控的數(shù)據(jù)源變成一個(gè)組 a1.sources.s1.filegroups = f1 #指定了f1是誰:監(jiān)控目錄下所有文件 a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #指定f1采集到的數(shù)據(jù)的header中包含一個(gè)KV對(duì) a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d a1.sinks.k1.hdfs.fileType = DataStream #指定按照時(shí)間生成文件,一般關(guān)閉 a1.sinks.k1.hdfs.rollInterval = 0 #指定文件大小生成文件,一般120 ~ 125M對(duì)應(yīng)的字節(jié)數(shù) a1.sinks.k1.hdfs.rollSize = 102400 #指定event個(gè)數(shù)生成文件,一般關(guān)閉 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = momo a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.useLocalTimeStamp = true #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
啟動(dòng)HDFS
start-dfs.sh
-
運(yùn)行Flume
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
-
運(yùn)行模擬數(shù)據(jù)
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 100
-
查看結(jié)果
-
-
-
小結(jié)
- 回顧Flume基本使用及實(shí)現(xiàn)Flume的安裝測(cè)試
07:Flume采集程序開發(fā)
-
目標(biāo):實(shí)現(xiàn)案例Flume采集程序的開發(fā)
-
路徑
- step1:需求分析
- step2:程序開發(fā)
- step3:測(cè)試實(shí)現(xiàn)
-
實(shí)施
-
需求分析
-
需求:采集聊天數(shù)據(jù),實(shí)時(shí)寫入Kafka
-
Source:taildir
-
Channel:mem
-
Sink:Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
-
-
程序開發(fā)
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #指定一個(gè)元數(shù)據(jù)記錄文件 a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json #將所有需要監(jiān)控的數(shù)據(jù)源變成一個(gè)組 a1.sources.s1.filegroups = f1 #指定了f1是誰:監(jiān)控目錄下所有文件 a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #指定f1采集到的數(shù)據(jù)的header中包含一個(gè)KV對(duì) a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = MOMO_MSG a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.flumeBatchSize = 10 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 100 #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
測(cè)試實(shí)現(xiàn)
-
啟動(dòng)Kafka
start-zk-all.sh start-kafka.sh
-
創(chuàng)建Topic
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
注意:Kafka2.11版本用–zookeeper 替代
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092 -
列舉
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
-
啟動(dòng)消費(fèi)者
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
-
啟動(dòng)Flume程序
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
啟動(dòng)模擬數(shù)據(jù)
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 50
-
觀察結(jié)果
文章來源:http://www.zghlxwxcb.cn/news/detail-765674.html
-
-
-
小結(jié)
- 實(shí)現(xiàn)案例Flume采集程序的開發(fā)
到了這里,關(guān)于基于Flume+Kafka+Hbase+Flink+FineBI的實(shí)時(shí)綜合案例(二)數(shù)據(jù)源的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!