国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

這篇具有很好參考價(jià)值的文章主要介紹了【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

最近做了flume實(shí)時(shí)采集mysql數(shù)據(jù)到kafka的實(shí)驗(yàn),做個(gè)筆記,防止忘記
?。?!建議從頭看到尾,因?yàn)橐恍┖?jiǎn)單的東西我在前面提了,后面沒提。

Kafka搭建:https://blog.csdn.net/cjwfinal/article/details/120803013

flume搭建:https://blog.csdn.net/cjwfinal/article/details/120441503?spm=1001.2014.3001.5502


一、flume寫入當(dāng)前文件系統(tǒng)

題目:

編寫配置文件,設(shè)置文件夾mylogs為source位置,文件夾backup為sink寫入位置,實(shí)現(xiàn)對(duì)文件夾的數(shù)據(jù)備份。
新建兩個(gè)文本文本文件1.txt與2.txt,在1.txt中輸入Hello Flume,在2.txt中輸入hello flume將兩個(gè)文件拖入mylog,查看backup文件夾中出現(xiàn)的文件及其內(nèi)容。文件可用記事本打開

解題:

1.flume配置文件

#a1表示agent的名字 可以自定義
# 給sources(在一個(gè)agent里可以定義多個(gè)source)取個(gè)名字
a1.sources = r1
# 給channel個(gè)名字
a1.channels = c1
# 給channel個(gè)名字
a1.sinks = k1

# 對(duì)source進(jìn)行配置
# agent的名字.sources.source的名字.參數(shù) = 參數(shù)值

# source的類型 spoolDir(監(jiān)控一個(gè)目錄下的文件的變化)
a1.sources.r1.type = spooldir
# 監(jiān)聽哪一個(gè)目錄
a1.sources.r1.spoolDir = /root/mylogs
# 是否在event的headers中保存文件的絕對(duì)路徑
a1.sources.r1.fileHeader = true
# 給攔截器取個(gè)名字 i1
a1.sources.r1.interceptors = i1
# 使用timestamp攔截器,將處理數(shù)據(jù)的時(shí)間保存到event的headers中
a1.sources.r1.interceptors.i1.type = timestamp

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sink為logger
# 直接打印到控制臺(tái)
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /root/backup

# 將source、channel、sink組裝成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.要保證配置文件中的文件的路徑都存在,否則會(huì)報(bào)錯(cuò)

3.啟動(dòng)flume

啟動(dòng)命令:
flume-ng agent -n a1 -f spoolingtest.cong -Dflume.root.logger=DEBUG,console

a1是agent的名字,spoolingtest.cong是配置文件名,如果不一樣請(qǐng)修改

【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

4.測(cè)試

1.啟動(dòng)flume

注意:要在配置文件所在的目錄啟動(dòng)

2.在mylogs文件夾下創(chuàng)建兩個(gè)文件1.txt,2.txt,分別寫入hello world和Hello World

寫入并保存,后flume打印出來的日志會(huì)發(fā)生改變。

3.查看backup文件夾

backup文件夾中,會(huì)產(chǎn)生許多文件,其中某個(gè)文件中會(huì)保存上面兩個(gè)文件的內(nèi)容,自行查找即可。
【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

二、flume連接kafka

題目

把flume監(jiān)聽到的文件內(nèi)容,輸入到,kafka,并由kafka的消費(fèi)著消費(fèi)出來。

解題

1.flume配置文件

 
# a1表示agent的名字 可以自定義
# # 給sources(在一個(gè)agent里可以定義多個(gè)source)取個(gè)名字
a1.sources = r1
# # 給channel個(gè)名字
a1.channels = c1
# # 給channel個(gè)名字
a1.sinks = k1
# 對(duì)source進(jìn)行配置
# agent的名字.sources.source的名字.參數(shù) = 參數(shù)值

# source的類型 spoolDir(監(jiān)控一個(gè)目錄下的文件的變化)
a1.sources.r1.type = exec
# 監(jiān)聽哪一個(gè)目錄
a1.sources.r1.command = tail -F -c +0 /root/dir1/test.log
# 是否在event的headers中保存文件的絕對(duì)路徑
a1.sources.r1.fileHeader = true
# 給攔截器取個(gè)名字 i1
a1.sources.r1.interceptors = i1
# 使用timestamp攔截器,將處理數(shù)據(jù)的時(shí)間保存到event的headers中
a1.sources.r1.interceptors.i1.type = timestamp

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 配置sinks,輸出到kafka中
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# topic是指定kafka的主題
a1.sinks.k1.kafka.topic = events
# master是主機(jī)名,也可以是主機(jī)IP地址
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# 將source、channel、sink組裝成agent![在這里插入圖片描述](https://img-blog.csdnimg.cn/13519938e4ba42b9a4294454f2013dc2.png#pic_center)

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2測(cè)試

1.創(chuàng)建指定的目錄/root/dir1/test.log
2.啟動(dòng)flume
flume-ng agent -n a1 -f spoolingtest.cong -Dflume.root.logger=DEBUG,console
3.啟動(dòng)kafka的消費(fèi)者
啟動(dòng)命令
#啟動(dòng)消費(fèi)者之前首先要?jiǎng)?chuàng)建主題
kafka-topics.sh --create --topic events --bootstrap-server master:9092
#啟動(dòng)消費(fèi)者
kafka-console-consumer.sh --bootstrap-server master:9092 --topic events --from-beginning
kafka主題操作命令
#刪除主題
kafka-topics.sh --delete --topic events --zookeeper localhost:2181
#查看主題列表
kafka-topics.sh --list --bootstrap-server master:9092
4.在test.log中寫入數(shù)據(jù)

【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

5.觀察kafka的消費(fèi)是否消費(fèi)出數(shù)據(jù)

【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

三、flume實(shí)時(shí)采集mysql數(shù)據(jù)到kafka

題目

在MySQL中建立數(shù)據(jù)庫(kù)school,在數(shù)據(jù)庫(kù)中建立表student。SQL語句如下:

#創(chuàng)建數(shù)據(jù)庫(kù)
create database school;
#使用數(shù)據(jù)庫(kù)
use school;
#創(chuàng)建數(shù)據(jù)表
create table student(
	id int not null,
	name varchar(40),
	age int,
	grade int,
	primary key(id)
);
# 刪除數(shù)據(jù)
drop database school;
# 刪除數(shù)據(jù)表
drop database student;
# 查看數(shù)據(jù)表
show tables;

編寫配置文件,將student表中的內(nèi)容輸出到控制臺(tái)。啟動(dòng)Flume,在student表中使用下列命令插入數(shù)據(jù),在Kafka消費(fèi)者中查看相應(yīng)數(shù)據(jù)。

#插入數(shù)據(jù),sql
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);

解題

1.flume配置文件

# # 給sources(在一個(gè)agent里可以定義多個(gè)source)取個(gè)名字
a1.sources = r1
# # 給channel個(gè)名字
a1.channels = c1
# # 給channel個(gè)名字
a1.sinks = k1

#配置source
a1.sources.r1.type = org.keedio.flume.source.SQLSource
#ip地址和數(shù)據(jù)庫(kù)名稱需要修改,如果連接的本地的mysql改成本機(jī)IP地址,如果連接虛擬機(jī)上的mysql改成虛擬機(jī)的IP地址
a1.sources.r1.hibernate.connection.url = jdbc:mysql://ip地址/數(shù)據(jù)名稱?useSSL=false&allowPublicKeyRetrieval=true
# mysql用戶名,一般都是root
a1.sources.r1.hibernate.connection.user = root
# 密碼
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
# mysql驅(qū)動(dòng)
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
# 驅(qū)動(dòng)版本過低會(huì)無法使用,驅(qū)動(dòng)安裝下文會(huì)提及
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=5000

# 自定義查詢
a1.sources.r1.start.from = 0
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *

a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10
# 存放status文件
a1.sources.r1.status.file.path = /root/dir1
a1.sources.r1.status.file.name = r1.status
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
# 配置sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 指定主題名topic
a1.sinks.k1.kafka.topic = events
a1.sinks.k1.kafka.bootstrap.servers = master:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# 將source、channel、sink組裝成agent
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.要保證配置中的文件路徑存在,并且根據(jù)注釋做出相應(yīng)的修改

3.導(dǎo)入jar包到/flume/lib文件夾下

# flume連接mysqljar包
flume-ng-sql-source-1.4.3.jar
# mysql驅(qū)動(dòng)jar包
mysql-connector-java-5.1.49.jar

5.flume-ng-sql-source項(xiàng)目的官方網(wǎng)址

https://github.com/keedio/flume-ng-sql-source

6.測(cè)試

1.創(chuàng)建mysql數(shù)據(jù)庫(kù)、數(shù)據(jù)表
#創(chuàng)建數(shù)據(jù)庫(kù)
create database school;
#使用數(shù)據(jù)庫(kù)
use school;
#創(chuàng)建數(shù)據(jù)表
create table student(
	id int not null,
	name varchar(40),
	age int,
	grade int,
	primary key(id)
);
2.啟動(dòng)flume
flume-ng agent -n a1 -f spoolingtest.cong -Dflume.root.logger=DEBUG,console
3.啟動(dòng)kafka消費(fèi)者
#啟動(dòng)消費(fèi)者之前首先要?jiǎng)?chuàng)建主題
kafka-topics.sh --create --topic events --bootstrap-server master:9092
#啟動(dòng)消費(fèi)者
kafka-console-consumer.sh --bootstrap-server master:9092 --topic events --from-beginning
4.student表中插入數(shù)據(jù)
#插入數(shù)據(jù),sql
insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
insert into student(id,name,age,grade)value(3,'Lisi',24,93);
insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
5.觀察數(shù)據(jù)

【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】
為了方便測(cè)試我重復(fù)插入了一條數(shù)據(jù)

6.遇到的問題:消費(fèi)者沒有消費(fèi)出來數(shù)據(jù),且flume配置文件沒錯(cuò)
解決

刪除存放的status文件,也就是我配置文件中/root/dir1文件夾中的東西
【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

四、Kafka鏈接Flume,將生產(chǎn)的消息存入到HDFS

題目

編寫配置文件,將kafka作為輸入,在生產(chǎn)者中輸入“HelloFlume”或其他信息,通過Flume將Kafka生產(chǎn)者輸入的信息存入hdfs中,存儲(chǔ)格式hdfs://localhost:9000/fromkafka/%Y%m%d/,要求存儲(chǔ)時(shí)文件名為kafka_log。

解答

1.flume配置文件


agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hdfs_sink
# 以下配置 source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
# master為主機(jī)名或者主機(jī)IP
agent.sources.kafka_source.kafka.bootstrap.servers = master:9092
# 需要指定kafka生產(chǎn)者的主題
agent.sources.kafka_source.kafka.topics = kafka-flume
# 以下配置 sink
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.channel = mem_channel
# 輸出到HDFS文件的路徑
agent.sinks.hdfs_sink.hdfs.path = /data/kafka-flume/%Y%m%d
# HDFS文件前綴
agent.sinks.hdfs_sink.hdfs.filePrefix = kafka_log
# 使用時(shí)間戳
a1.sinks.r1.hdfs.useLocalTimeStamp = true

agent.sinks.hdfs_sink.hdfs.rollSize = 0
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat=Text
# 以下配置 channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000

2.根據(jù)配置文件的注釋根據(jù)自己的情況做出修改

3.hadoop操作文件夾命令

# 創(chuàng)建文件夾
hadoop fs -mkdir 文件夾路徑
# 刪除文件夾
hadoop fs -rm -r -skipTrash 文件夾路徑

4.測(cè)試

1.啟動(dòng)flume
#啟動(dòng)命令
flume-ng agent -n agent -f kafka-flume-hdfs.cong -Dflume.root.logger=DEBUG,console
# 參數(shù)解釋
第二個(gè)agent為配置文件中agent的名字,kafka-flume-hdfs.cong為配置文件名,根據(jù)自己情況做出修改
2.啟動(dòng)kafka生產(chǎn)者
# 啟動(dòng)命令,根據(jù)自己配置變換主題名
kafka-console-producer.sh --broker-list master:9092 --topic kafka-flume
# 啟動(dòng)后,輸入HelloFlume
3.觀察HDFS界面,尋找并下載文件,查看內(nèi)容

【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】

記得關(guān)閉flume后再下載,不然會(huì)下載出來.tmp文件文章來源地址http://www.zghlxwxcb.cn/news/detail-437003.html

有什么問題評(píng)論區(qū)提問!

到了這里,關(guān)于【flume實(shí)時(shí)采集mysql數(shù)據(jù)庫(kù)的數(shù)據(jù)到kafka】的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flume學(xué)習(xí)-采集端口數(shù)據(jù)存入kafka

    Flume學(xué)習(xí)-采集端口數(shù)據(jù)存入kafka

    啟動(dòng)zookeeper、kafka并創(chuàng)建kafka主題 2、創(chuàng)建flume-kafka.conf配置文件 用于采集socket數(shù)據(jù)后存入kafka 在flume文件夾中的conf下新建flume-kafka.conf配置文件 設(shè)置監(jiān)聽本地端口10050 netcat發(fā)送的socket數(shù)據(jù),講采集到的數(shù)據(jù)存入kafka的hunter主題中 3、啟動(dòng)flume ./bin/flume-ng :?jiǎn)?dòng)Flume-ng二進(jìn)制文件。

    2024年02月03日
    瀏覽(27)
  • 使用Flume-KafkaSource實(shí)時(shí)采集Avro格式數(shù)據(jù)

    Flume是一個(gè)可靠、可擴(kuò)展且具有高可用性的分布式系統(tǒng),用于在大規(guī)模數(shù)據(jù)集群中進(jìn)行高效的日志聚合、收集和傳輸。Kafka是一個(gè)分布式流處理平臺(tái),用于處理高容量的實(shí)時(shí)數(shù)據(jù)流。本文將介紹如何使用Flume的KafkaSource來實(shí)時(shí)采集Avro格式的數(shù)據(jù),并提供相應(yīng)的源代碼。 首先,確

    2024年02月07日
    瀏覽(32)
  • 實(shí)時(shí)Flink的數(shù)據(jù)庫(kù)與Kafka集成優(yōu)化案例

    在現(xiàn)代數(shù)據(jù)處理系統(tǒng)中,實(shí)時(shí)數(shù)據(jù)處理和分析是至關(guān)重要的。Apache Flink是一個(gè)流處理框架,可以用于實(shí)時(shí)數(shù)據(jù)處理和分析。在許多場(chǎng)景下,F(xiàn)link需要與數(shù)據(jù)庫(kù)和Kafka等消息系統(tǒng)進(jìn)行集成,以實(shí)現(xiàn)更高效的數(shù)據(jù)處理。本文將討論Flink與數(shù)據(jù)庫(kù)和Kafka集成的優(yōu)化案例,并提供實(shí)際示

    2024年02月20日
    瀏覽(29)
  • 【Kafka+Flume+Mysql+Spark】實(shí)現(xiàn)新聞話題實(shí)時(shí)統(tǒng)計(jì)分析系統(tǒng)(附源碼)

    【Kafka+Flume+Mysql+Spark】實(shí)現(xiàn)新聞話題實(shí)時(shí)統(tǒng)計(jì)分析系統(tǒng)(附源碼)

    需要源碼請(qǐng)點(diǎn)贊關(guān)注收藏后評(píng)論區(qū)留言私信~~~ 新聞話題實(shí)時(shí)統(tǒng)計(jì)分析系統(tǒng)以搜狗實(shí)驗(yàn)室的用戶查詢?nèi)罩緸榛A(chǔ),模擬生成用戶查詢?nèi)罩?,通過Flume將日志進(jìn)行實(shí)時(shí)采集、匯集,分析并進(jìn)行存儲(chǔ)。利用Spark Streaming實(shí)時(shí)統(tǒng)計(jì)分析前20名流量最高的新聞話題,并在前端頁面實(shí)時(shí)顯示

    2024年02月06日
    瀏覽(21)
  • 【數(shù)倉(cāng)】通過Flume+kafka采集日志數(shù)據(jù)存儲(chǔ)到Hadoop

    【數(shù)倉(cāng)】通過Flume+kafka采集日志數(shù)據(jù)存儲(chǔ)到Hadoop

    【數(shù)倉(cāng)】基本概念、知識(shí)普及、核心技術(shù) 【數(shù)倉(cāng)】數(shù)據(jù)分層概念以及相關(guān)邏輯 【數(shù)倉(cāng)】Hadoop軟件安裝及使用(集群配置) 【數(shù)倉(cāng)】Hadoop集群配置常用參數(shù)說明 【數(shù)倉(cāng)】zookeeper軟件安裝及集群配置 【數(shù)倉(cāng)】kafka軟件安裝及集群配置 【數(shù)倉(cāng)】flume軟件安裝及配置 【數(shù)倉(cāng)】flum

    2024年03月17日
    瀏覽(28)
  • 大數(shù)據(jù)之使用Flume監(jiān)聽端口采集數(shù)據(jù)流到Kafka

    大數(shù)據(jù)之使用Flume監(jiān)聽端口采集數(shù)據(jù)流到Kafka

    前言 題目: 一、讀題分析 二、處理過程?? 1.先在Kafka中創(chuàng)建符合題意的Kafka的topic ?創(chuàng)建符合題意的Kafka的topic 2.寫出Flume所需要的配置文件 3.啟動(dòng)腳本然后啟動(dòng)Flume監(jiān)聽端口數(shù)據(jù)并傳到Kafka 啟動(dòng)flume指令 啟動(dòng)腳本,觀察Flume和Kafka的變化 三、重難點(diǎn)分析 總結(jié)? ????????本題

    2024年02月08日
    瀏覽(27)
  • 一百七十二、Flume——Flume采集Kafka數(shù)據(jù)寫入HDFS中(親測(cè)有效、附截圖)

    一百七十二、Flume——Flume采集Kafka數(shù)據(jù)寫入HDFS中(親測(cè)有效、附截圖)

    作為日志采集工具Flume,它在項(xiàng)目中最常見的就是采集Kafka中的數(shù)據(jù)然后寫入HDFS或者HBase中,這里就是用flume采集Kafka的數(shù)據(jù)導(dǎo)入HDFS中 kafka_2.13-3.0.0.tgz hadoop-3.1.3.tar.gz apache-flume-1.9.0-bin.tar.gz # cd ?/home/hurys/dc_env/flume190/conf # vi ?evaluation.properties ### Name agent, source, channels and sink ali

    2024年02月09日
    瀏覽(19)
  • 日志采集傳輸框架之 Flume,將監(jiān)聽端口數(shù)據(jù)發(fā)送至Kafka

    日志采集傳輸框架之 Flume,將監(jiān)聽端口數(shù)據(jù)發(fā)送至Kafka

    1、簡(jiǎn)介???????? ????????Flume 是 Cloudera 提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集、聚合和傳 輸?shù)南到y(tǒng)。Flume 基于流式架構(gòu),主要有以下幾個(gè)部分組成。 ?主要組件介紹: 1)、 Flume Agent 是一個(gè) JVM 進(jìn)程,它以事件的形式將數(shù)據(jù)從源頭送至目的。Agent 主

    2024年01月22日
    瀏覽(29)
  • (二十八)大數(shù)據(jù)實(shí)戰(zhàn)——Flume數(shù)據(jù)采集之kafka數(shù)據(jù)生產(chǎn)與消費(fèi)集成案例

    (二十八)大數(shù)據(jù)實(shí)戰(zhàn)——Flume數(shù)據(jù)采集之kafka數(shù)據(jù)生產(chǎn)與消費(fèi)集成案例

    本節(jié)內(nèi)容我們主要介紹一下flume數(shù)據(jù)采集和kafka消息中間鍵的整合。通過flume監(jiān)聽nc端口的數(shù)據(jù),將數(shù)據(jù)發(fā)送到kafka消息的first主題中,然后在通過flume消費(fèi)kafka中的主題消息,將消費(fèi)到的消息打印到控制臺(tái)上。集成使用flume作為kafka的生產(chǎn)者和消費(fèi)者。關(guān)于nc工具、flume以及kafka的

    2024年02月09日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包