最近做了flume實(shí)時(shí)采集mysql數(shù)據(jù)到kafka的實(shí)驗(yàn),做個(gè)筆記,防止忘記
?。?!建議從頭看到尾,因?yàn)橐恍┖?jiǎn)單的東西我在前面提了,后面沒提。
Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013
flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502
一、flume寫入當(dāng)前文件系統(tǒng)
題目:
編寫配置文件,設(shè)置文件夾mylogs為source位置,文件夾backup為sink寫入位置,實(shí)現(xiàn)對(duì)文件夾的數(shù)據(jù)備份。
新建兩個(gè)文本文本文件1.txt與2.txt,在1.txt中輸入Hello Flume,在2.txt中輸入hello flume將兩個(gè)文件拖入mylog,查看backup文件夾中出現(xiàn)的文件及其內(nèi)容。文件可用記事本打開
解題:
1.flume配置文件
#a1表示agent的名字 可以自定義
# 給sources(在一個(gè)agent里可以定義多個(gè)source)取個(gè)名字
a1.sources = r1
# 給channel個(gè)名字
a1.channels = c1
# 給channel個(gè)名字
a1.sinks = k1
# 對(duì)source進(jìn)行配置
# agent的名字.sources.source的名字.參數(shù) = 參數(shù)值
# source的類型 spoolDir(監(jiān)控一個(gè)目錄下的文件的變化)
a1.sources.r1.type = spooldir
# 監(jiān)聽哪一個(gè)目錄
a1.sources.r1.spoolDir = /root/mylogs
# 是否在event的headers中保存文件的絕對(duì)路徑
a1.sources.r1.fileHeader = true
# 給攔截器取個(gè)名字 i1
a1.sources.r1.interceptors = i1
# 使用timestamp攔截器,將處理數(shù)據(jù)的時(shí)間保存到event的headers中
a1.sources.r1.interceptors.i1.type = timestamp
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink為logger
# 直接打印到控制臺(tái)
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /root/backup
# 將source、channel、sink組裝成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.要保證配置文件中的文件的路徑都存在,否則會(huì)報(bào)錯(cuò)
3.啟動(dòng)flume
啟動(dòng)命令:
flume-ng agent -n a1 -f spoolingtest.cong -Dflume.root.logger=DEBUG,console
a1是agent的名字,spoolingtest.cong是配置文件名,如果不一樣請(qǐng)修改
4.測(cè)試
1.啟動(dòng)flume
注意:要在配置文件所在的目錄啟動(dòng)
2.在mylogs文件夾下創(chuàng)建兩個(gè)文件1.txt,2.txt,分別寫入hello world和Hello World
寫入并保存,后flume打印出來的日志會(huì)發(fā)生改變。
3.查看backup文件夾
backup文件夾中,會(huì)產(chǎn)生許多文件,其中某個(gè)文件中會(huì)保存上面兩個(gè)文件的內(nèi)容,自行查找即可。
二、flume連接kafka
題目
把flume監(jiān)聽到的文件內(nèi)容,輸入到,kafka,并由kafka的消費(fèi)著消費(fèi)出來。
解題
1.flume配置文件
# a1表示agent的名字 可以自定義
# # 給sources(在一個(gè)agent里可以定義多個(gè)source)取個(gè)名字
a1.sources = r1
# # 給channel個(gè)名字
a1.channels = c1
# # 給channel個(gè)名字
a1.sinks = k1
# 對(duì)source進(jìn)行配置
# agent的名字.sources.source的名字.參數(shù) = 參數(shù)值
# source的類型 spoolDir(監(jiān)控一個(gè)目錄下的文件的變化)
a1.sources.r1.type = exec
# 監(jiān)聽哪一個(gè)目錄
a1.sources.r1.command = tail -F -c +0 /root/dir1/test.log
# 是否在event的headers中保存文件的絕對(duì)路徑
a1.sources.r1.fileHeader = true
# 給攔截器取個(gè)名字 i1
a1.sources.r1.interceptors = i1
# 使用timestamp攔截器,將處理數(shù)據(jù)的時(shí)間保存到event的headers中
a1.sources.r1.interceptors.i1.type = timestamp
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sinks,輸出到kafka中
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# topic是指定kafka的主題
a1.sinks.k1.kafka.topic = events
# master是主機(jī)名,也可以是主機(jī)IP地址
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 將source、channel、sink組裝成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2測(cè)試
1.創(chuàng)建指定的目錄/root/dir1/test.log
2.啟動(dòng)flume
flume-ng agent -n a1 -f spoolingtest.cong -Dflume.root.logger=DEBUG,console
3.啟動(dòng)kafka的消費(fèi)者
啟動(dòng)命令
#啟動(dòng)消費(fèi)者之前首先要?jiǎng)?chuàng)建主題
kafka-topics.sh --create --topic events --bootstrap-server master:9092
#啟動(dòng)消費(fèi)者
kafka-console-consumer.sh --bootstrap-server master:9092 --topic events --from-beginning
kafka主題操作命令
#刪除主題
kafka-topics.sh --delete --topic events --zookeeper localhost:2181
#查看主題列表
kafka-topics.sh --list --bootstrap-server master:9092
4.在test.log中寫入數(shù)據(jù)
5.觀察kafka的消費(fèi)是否消費(fèi)出數(shù)據(jù)
三、flume實(shí)時(shí)采集mysql數(shù)據(jù)到kafka
題目
在MySQL中建立數(shù)據(jù)庫(kù)school,在數(shù)據(jù)庫(kù)中建立表student。SQL語句如下:
#創(chuàng)建數(shù)據(jù)庫(kù)
create database school;
#使用數(shù)據(jù)庫(kù)
use school;
#創(chuàng)建數(shù)據(jù)表
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
# 刪除數(shù)據(jù)
drop database school;
# 刪除數(shù)據(jù)表
drop database student;
# 查看數(shù)據(jù)表
show tables;
編寫配置文件,將student表中的內(nèi)容輸出到控制臺(tái)。啟動(dòng)Flume,在student表中使用下列命令插入數(shù)據(jù),在Kafka消費(fèi)者中查看相應(yīng)數(shù)據(jù)。
#插入數(shù)據(jù),sql
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
解題
1.flume配置文件
# # 給sources(在一個(gè)agent里可以定義多個(gè)source)取個(gè)名字
a1.sources = r1
# # 給channel個(gè)名字
a1.channels = c1
# # 給channel個(gè)名字
a1.sinks = k1
#配置source
a1.sources.r1.type = org.keedio.flume.source.SQLSource
#ip地址和數(shù)據(jù)庫(kù)名稱需要修改,如果連接的本地的mysql改成本機(jī)IP地址,如果連接虛擬機(jī)上的mysql改成虛擬機(jī)的IP地址
a1.sources.r1.hibernate.connection.url = jdbc:mysql://ip地址/數(shù)據(jù)名稱?useSSL=false&allowPublicKeyRetrieval=true
# mysql用戶名,一般都是root
a1.sources.r1.hibernate.connection.user = root
# 密碼
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
# mysql驅(qū)動(dòng)
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
# 驅(qū)動(dòng)版本過低會(huì)無法使用,驅(qū)動(dòng)安裝下文會(huì)提及
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=5000
# 自定義查詢
a1.sources.r1.start.from = 0
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10
# 存放status文件
a1.sources.r1.status.file.path = /root/dir1
a1.sources.r1.status.file.name = r1.status
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
# 配置sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 指定主題名topic
a1.sinks.k1.kafka.topic = events
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 將source、channel、sink組裝成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.要保證配置中的文件路徑存在,并且根據(jù)注釋做出相應(yīng)的修改
3.導(dǎo)入jar包到/flume/lib文件夾下
# flume連接mysqljar包
flume-ng-sql-source-1.4.3.jar
# mysql驅(qū)動(dòng)jar包
mysql-connector-java-5.1.49.jar
5.flume-ng-sql-source項(xiàng)目的官方網(wǎng)址
https://github.com/keedio/flume-ng-sql-source
6.測(cè)試
1.創(chuàng)建mysql數(shù)據(jù)庫(kù)、數(shù)據(jù)表
#創(chuàng)建數(shù)據(jù)庫(kù)
create database school;
#使用數(shù)據(jù)庫(kù)
use school;
#創(chuàng)建數(shù)據(jù)表
create table student(
id int not null,
name varchar(40),
age int,
grade int,
primary key(id)
);
2.啟動(dòng)flume
flume-ng agent -n a1 -f spoolingtest.cong -Dflume.root.logger=DEBUG,console
3.啟動(dòng)kafka消費(fèi)者
#啟動(dòng)消費(fèi)者之前首先要?jiǎng)?chuàng)建主題
kafka-topics.sh --create --topic events --bootstrap-server master:9092
#啟動(dòng)消費(fèi)者
kafka-console-consumer.sh --bootstrap-server master:9092 --topic events --from-beginning
4.student表中插入數(shù)據(jù)
#插入數(shù)據(jù),sql
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
5.觀察數(shù)據(jù)
為了方便測(cè)試我重復(fù)插入了一條數(shù)據(jù)
6.遇到的問題:消費(fèi)者沒有消費(fèi)出來數(shù)據(jù),且flume配置文件沒錯(cuò)
解決
刪除存放的status文件,也就是我配置文件中/root/dir1文件夾中的東西
四、Kafka鏈接Flume,將生產(chǎn)的消息存入到HDFS
題目
編寫配置文件,將kafka作為輸入,在生產(chǎn)者中輸入“HelloFlume”或其他信息,通過Flume將Kafka生產(chǎn)者輸入的信息存入hdfs中,存儲(chǔ)格式hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存儲(chǔ)時(shí)文件名為kafka_log。
解答
1.flume配置文件
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hdfs_sink
# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
# master為主機(jī)名或者主機(jī)IP
agent.sources.kafka_source.kafka.bootstrap.servers = master:9092
# 需要指定kafka生產(chǎn)者的主題
agent.sources.kafka_source.kafka.topics = kafka-flume
# 以下配置 sink
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = mem_channel
# 輸出到HDFS文件的路徑
agent.sinks.hdfs_sink.hdfs.path = /data/kafka-flume/%Y%m%d
# HDFS文件前綴
agent.sinks.hdfs_sink.hdfs.filePrefix = kafka_log
# 使用時(shí)間戳
a1.sinks.r1.hdfs.useLocalTimeStamp = true
agent.sinks.hdfs_sink.hdfs.rollSize = 0
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat=Text
# 以下配置 channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
2.根據(jù)配置文件的注釋根據(jù)自己的情況做出修改
3.hadoop操作文件夾命令
# 創(chuàng)建文件夾
hadoop fs -mkdir 文件夾路徑
# 刪除文件夾
hadoop fs -rm -r -skipTrash 文件夾路徑
4.測(cè)試
1.啟動(dòng)flume
#啟動(dòng)命令
flume-ng agent -n agent -f kafka-flume-hdfs.cong -Dflume.root.logger=DEBUG,console
# 參數(shù)解釋
第二個(gè)agent為配置文件中agent的名字,kafka-flume-hdfs.cong為配置文件名,根據(jù)自己情況做出修改
2.啟動(dòng)kafka生產(chǎn)者
# 啟動(dòng)命令,根據(jù)自己配置變換主題名
kafka-console-producer.sh --broker-list master:9092 --topic kafka-flume
# 啟動(dòng)后,輸入HelloFlume
3.觀察HDFS界面,尋找并下載文件,查看內(nèi)容
文章來源:http://www.zghlxwxcb.cn/news/detail-437003.html
記得關(guān)閉flume后再下載,不然會(huì)下載出來.tmp文件文章來源地址http://www.zghlxwxcb.cn/news/detail-437003.html
有什么問題評(píng)論區(qū)提問!
到了這里,關(guān)于【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!