前言
Kafka 是由 Apache 軟件基金會(huì)開(kāi)發(fā)的一個(gè)開(kāi)源消息隊(duì)列平臺(tái),它是一種高性能、可擴(kuò)展、分布式的發(fā)布-訂閱消息系統(tǒng)。Kafka 的架構(gòu)被設(shè)計(jì)為高效、低延遲,并具有高吞吐量、持久性和可靠性。
在 Kafka 中,生產(chǎn)者將消息發(fā)布到主題(topic)中,消費(fèi)者則從主題中消費(fèi)消息,使用者可以將其看作一個(gè) highly scalable 分布式 commit log 或者消息系統(tǒng) (Messaging system),每個(gè)消息包含一個(gè) key,一個(gè) value 和一個(gè)額外的 timestamp。消息保留時(shí)間通過(guò)配置進(jìn)行控制,當(dāng)時(shí)間或空間滿了的時(shí)候就根據(jù)策略來(lái)清除老數(shù)據(jù),默認(rèn)情況下老數(shù)據(jù)只保存 7 天。
特點(diǎn):
1.高吞吐量:Kafka 在發(fā)布-訂閱消息方面具有非常高的性能。它可以幾乎實(shí)時(shí)地處理高速流入的大量數(shù)據(jù)。
實(shí)時(shí)處理:Kafka 能夠處理高達(dá)數(shù)以百萬(wàn)計(jì)的消息,并準(zhǔn)確地將消息排序和在群組內(nèi)進(jìn)行調(diào)度。
2.持久性和可靠性:與傳統(tǒng)的消息系統(tǒng)不同,Kafka 具有持久性和可靠性??蛻舳俗约禾峤划?dāng)前偏移量,避免了可能出現(xiàn)的重復(fù)讀取問(wèn)題。
3.可擴(kuò)展性:Kafka 可以在不繁瑣的配置或修改信息格式等環(huán)節(jié)就能進(jìn)行擴(kuò)展。
4.多樣化數(shù)據(jù)類(lèi)型和來(lái)源:通過(guò)使用支持多種編程語(yǔ)言和操作系統(tǒng)的 API,Kafka 可以連接到許多各種來(lái)源的應(yīng)用程序。
總之,Kafka 具有高性能、低時(shí)延,適合處理大規(guī)模物聯(lián)網(wǎng)設(shè)備、日志、報(bào)警信息、傳感器數(shù)據(jù)、消息等。
所以今天就來(lái)寫(xiě)一份關(guān)于熟悉Kafka的基本使用方法的實(shí)驗(yàn),希望可以與小伙伴們一起探討~~????
一、實(shí)驗(yàn)平臺(tái)
(1)操作系統(tǒng):Windows7及以上(我用的是Windows 11)
(2)Kafka版本:kafka_2.12-2.4.0
(3)MySQL版本:8.0
二、實(shí)驗(yàn)內(nèi)容
一、Kafka與MySQL的組合使用
1.實(shí)驗(yàn)要求
假設(shè)有一個(gè)學(xué)生表student,如下表所示,編寫(xiě)Python程序完成如下操作。
(1)讀取student表的數(shù)據(jù)內(nèi)容,將其轉(zhuǎn)換為JSON格式,發(fā)送給Kafka
(2)從Kafka中獲取JSON格式數(shù)據(jù),打印出來(lái)
sno | sname | ssex | sage |
---|---|---|---|
95001 | John | M | 23 |
95002 | Tom | M | 23 |
2.在MySQL中操作
(1)打開(kāi)MySQL
方式一:
方式二:
可以通過(guò) DOS 命令啟動(dòng) MySQL 服務(wù),windows+R,在搜索框中輸入cmd,進(jìn)去之后再輸入services.msc,就進(jìn)去服務(wù)系統(tǒng)里了,再啟動(dòng)就行
進(jìn)去以后輸入密碼就可以開(kāi)始執(zhí)行mysql語(yǔ)句了
(2)創(chuàng)建數(shù)據(jù)庫(kù)
create database school001;
(3)查看數(shù)據(jù)庫(kù)
show databases;
發(fā)現(xiàn)數(shù)據(jù)庫(kù)已經(jīng)被創(chuàng)建完成
(4)使用該數(shù)據(jù)庫(kù)
use school001;
(5)在該數(shù)據(jù)庫(kù)中創(chuàng)建student表
create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));
(6)查詢?cè)摂?shù)據(jù)庫(kù)中的student表
show tables;
(7)向student表中插入值
insert into student values("95001","John","M",23);
insert into student values("95002","Tom","M",23);
(8)查詢student表中的數(shù)據(jù)
select * from student;
查詢結(jié)果:
(到這里我們的student表就創(chuàng)建成功了!)????
3.安裝Kafka
簡(jiǎn)單介紹:
Kafka 的運(yùn)行需要 Java 環(huán)境的支持,因此,安裝 Kafka 前需要在 Windows 操作系統(tǒng)中安裝 JDK
訪問(wèn) Kafka 官網(wǎng),下載 Kafka 2.4.0的安裝文件 kafka 2.12-2.4.0.1gz,解壓縮到" C : \ "目錄下(也可以放到D盤(pán),不過(guò)最好放在D盤(pán)根目錄下,不然后續(xù)代碼容易報(bào)錯(cuò),我試過(guò))
因?yàn)?Katka 的運(yùn)行依賴(lài)于 Zookeeper ,因此,還需要下載并安裝 Zookeeper 。當(dāng)然, Kafka 也內(nèi)置了 Zookeeper 服務(wù),因此,也可以不額外安裝 Zookeeper ,直接使用內(nèi)置的Zookeeper 服務(wù)。為簡(jiǎn)單起見(jiàn),這里直接使用內(nèi)置的Zookeeper 服務(wù)。
win+r—>輸入cmd然后回車(chē)
輸入命令pip install kafka-python安裝python-kafka模塊
查看我們安裝的模塊的版本信息(出現(xiàn)kafka-python2.0.2表示我們安裝模塊成功)
具體怎么安裝可參考:kafka安裝部署
4.使用Kafka
在實(shí)驗(yàn)中要用到Kafka就要先啟動(dòng)它的Zookeeper服務(wù)和Kafka,且在實(shí)驗(yàn)過(guò)程中,千萬(wàn)不可以將其關(guān)閉,一旦關(guān)閉,服務(wù)就會(huì)停止
????
在 Windows 操作系統(tǒng)中打開(kāi)第1個(gè) cmd 命令行窗口,啟動(dòng) Zookeeper 服務(wù)
:
cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties
注意,執(zhí)行上面的命令以后, cmd 命令行窗口中會(huì)返回一堆信息,然后停住不動(dòng),沒(méi)有回到命令提示符狀態(tài)。這時(shí),不要誤以為是死機(jī),這表示 Zookeeper 服務(wù)器已經(jīng)啟動(dòng),正處于服務(wù)狀態(tài)。所以,不要關(guān)閉這個(gè) cmd 命令行窗口,一旦關(guān)閉, Zookeeper 服務(wù)就會(huì)停止
如圖:打開(kāi)第2個(gè) cmd 命令行窗口,然后輸入如下命令啟動(dòng) Kafka 服務(wù)
:
cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-server-start.bat .\config\server.Properties
同樣地,執(zhí)行上面的命令以后, cmd 命令行窗口中會(huì)返回一堆信息,然后停住不動(dòng),沒(méi)有回到命令提示符狀態(tài)。這時(shí),不要誤以為是死機(jī),這表示 Kafka 服務(wù)器已經(jīng)啟動(dòng),正處于服務(wù)狀態(tài)。所以,不要關(guān)閉這個(gè) cmd 命令行窗口,一旦關(guān)閉, Kafka 服務(wù)就會(huì)停止
若執(zhí)行上面的命令以后,如果啟動(dòng)失敗,并且出現(xiàn)提示信息"此時(shí)不應(yīng)有\(zhòng)QuickTime\QTSstem\QTJava.zip ",則需要把環(huán)境變量 CLASSPATH 的相關(guān)信息刪除。具體方法是,
右鍵單擊"計(jì)算機(jī)",再單擊"屬性"一"高級(jí)系統(tǒng)設(shè)置"一"環(huán)境變量",然后,找到變量 CLASSPATH ,把類(lèi)似下面的信息刪除:
C : Program Files (x86) QuickTime\QTSystem QTJava . zip
然后重新啟動(dòng)計(jì)算機(jī),讓配置修改生效。重新啟動(dòng)計(jì)算機(jī)以后,再次按照上面的方法啟動(dòng)Zookeeper和Kafka
為了測(cè)試 Kafka ,這里創(chuàng)建一個(gè)主題,名稱(chēng)為" topic_test ",其包含一個(gè)分區(qū),只有一個(gè)副本。在第3個(gè) cmd 命令行窗口中執(zhí)行如下命令
:
cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-topics.bat -- create -- zookeeper localhost:2181-- replication -
factor 1-- partitions 1-- topic topic_test
可以繼續(xù)執(zhí)行如下命令,查看 topic _ test 是否創(chuàng)建成功:
.\bin\windows\kafka-topics.bat -- list -- zookeeper localhost:2181
如果創(chuàng)建成功,就可以在執(zhí)行結(jié)果中看到 topic _ test繼續(xù)在第3個(gè) cmd 命令行窗口中執(zhí)行如下命令,創(chuàng)建一個(gè)生產(chǎn)者來(lái)產(chǎn)生消息
:
.\bin\windows\kafka-console-producer.bat -- broker-list localhost :9092 -topic topic_test
該命令執(zhí)行以后,屏幕上的光標(biāo)會(huì)持續(xù)閃爍,這時(shí),可以用鍵盤(pán)輸入一些內(nèi)容,例如:
I love Kafka
Kafka is good新建第4個(gè) cmd 命令行窗口,執(zhí)行如下命令來(lái)消費(fèi)消息
:
cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic_test --from-beginning
該命令執(zhí)行以后,屏幕上顯示剛才輸入的語(yǔ)句" I love Kafka “和” Kafka is good "
5.在PyCharm中操作
創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)讀取student表的數(shù)據(jù)內(nèi)容,將其轉(zhuǎn)換為JSON格式,發(fā)送給Kafka的功能
# 運(yùn)行前先在win上啟動(dòng)zookeap和kafka
# 導(dǎo)入相關(guān)模塊
from kafka import KafkaProducer
import json
# 連接kafka json.dumps(v).encode('utf-8')將json格式的數(shù)摳轉(zhuǎn)挨為字節(jié)類(lèi)型,然后使用ut了-8進(jìn)行編碼
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 定義一個(gè)json格式的數(shù)第,json格式以鍵值對(duì)形式保存數(shù)掂,每個(gè)鍵值對(duì)之間使用逗號(hào)隔開(kāi)
data = {
'sno': '95001',
'sname': 'John',
'ssex': 'M',
'sage': 23
}
# 發(fā)送數(shù)據(jù)
producer.send('test001', data)
# 關(guān)閉資源
producer.close()
運(yùn)行結(jié)果如下圖所示:
創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)從Kafka中獲取JSON格式數(shù)據(jù),打印出來(lái)的功能
# 運(yùn)行前先在win上啟動(dòng)mysql
# 導(dǎo)入消費(fèi)模塊
import json
# 導(dǎo)入kafka的消費(fèi)模塊
from kafka import KafkaConsumer
import json
import pymysql.cursors
# 連接kafka
consumer = KafkaConsumer('test001', bootstrap_servers='localhost:9092', group_id=None, auto_offset_reset='earliest')
# 對(duì)獲取的數(shù)據(jù)進(jìn)行解析
for msg in consumer:
# 轉(zhuǎn)換為字符串類(lèi)型
msg1 = str(msg.value, encoding=('utf-8'))
# 將字符串的數(shù)據(jù)加載為字典
dict = json.loads(msg1)
# 連接數(shù)據(jù)庫(kù)
connect = pymysql.Connect(
host='localhost',
port=3306,
user='root',
passwd='xxxxxxxx',#這是你MySQL數(shù)據(jù)庫(kù)的密碼
db='school001',
charset='utf8'
)
# 獲取操作數(shù)摳庫(kù)的對(duì)象<游標(biāo)>
cursor = connect.cursor()
# 將數(shù)摳織存到mysqL(插入數(shù)擲)
# 定義sql語(yǔ)句
sql = "select * from student;"
# 將數(shù)據(jù)作為參數(shù)傳速給sqL,保存到hrgsql
cursor.execute(sql)
# 提交
connect.commit()
for row in cursor.fetchall():
print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
print("共查詢出", cursor.rowcount, '條數(shù)據(jù)')
connect.close()
運(yùn)行結(jié)果如下圖所示:
二、消費(fèi)者手動(dòng)提交
1.實(shí)驗(yàn)要求
生成一個(gè)data.json文件,內(nèi)容如下:
data = [
{“name”: “Tony”, “age”: 21, “hobbies”: [“basketball”, “tennis”]},
{“name”: “Lisa”, “age”: 20, “hobbies”: [“sing”, “dance”]},
]
根據(jù)上面給出的data.json文件,執(zhí)行如下操作。
(1)編寫(xiě)生產(chǎn)者程序,將JSON文件數(shù)據(jù)發(fā)送給Kafka。
(2)編寫(xiě)消費(fèi)者程序,讀取Kafka的JSON格式數(shù)據(jù),并手動(dòng)提交偏移量。
2.在PyCharm中操作
-
創(chuàng)建一個(gè)Test寫(xiě)入以下代碼,來(lái)實(shí)現(xiàn)生成data.json文件的功能
:
import json
data = [
{"name": "Tony", "age": 21, "hobbies": ["basketball", "tennis"]},
{"name": "Lisa", "age": 20, "hobbies": ["sing", "dance"]},
]
with open('../../data.json', 'w') as f:
json.dump(data, f)
創(chuàng)建一個(gè).py文件,編寫(xiě)生產(chǎn)者程序,來(lái)實(shí)現(xiàn)將JSON文件數(shù)據(jù)發(fā)送給Kafka的功能
# 可以使用 Python 的 json 模塊讀取 data.json 文件,并將數(shù)據(jù)轉(zhuǎn)換為字符串后發(fā)送給 Kafka
from kafka import KafkaProducer
import json
data = [
{
"name": "Tony",
"age": 21,
"hobbies": ["basketball", "tennis"]
},
{
"name": "Lisa",
"age": 20,
"hobbies": ["sing", "dance"]
}
]
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for item in data:
# 將數(shù)據(jù)轉(zhuǎn)換為字符串格式并發(fā)送給 Kafka 主題 test
message = json.dumps(item).encode('utf-8')
producer.send('test', value=message)
producer.close()
運(yùn)行結(jié)果如下圖所示:
創(chuàng)建一個(gè).py文件,編寫(xiě)消費(fèi)者程序,來(lái)實(shí)現(xiàn)讀取Kafka的JSON格式數(shù)據(jù),并手動(dòng)提交偏移量的功能
# 我們可以使用 Kafka 消費(fèi)者 API 進(jìn)行數(shù)據(jù)消費(fèi),并在處理完每個(gè)消息后手動(dòng)提交偏移量。
from kafka import KafkaConsumer
import json
# 配置 Kafka 消費(fèi)者,指定主題和分組等信息
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False, # 禁止自動(dòng)提交偏移量
group_id='my-group')
# 循環(huán)消費(fèi) Kafka 消息
for message in consumer:
# 將傳入的二進(jìn)制消息內(nèi)容解碼為 JSON 格式的字符串
item = json.loads(message.value.decode('utf-8'))
print(item)
# 手動(dòng)提交偏移量,確保下次消費(fèi)時(shí)從正確的位置開(kāi)始
consumer.commit()
運(yùn)行結(jié)果如下圖所示:
三、Kafka消費(fèi)者訂閱分區(qū)
1.實(shí)驗(yàn)要求
在命令行窗口中啟動(dòng)Kafka后,手動(dòng)創(chuàng)建主題 “assign_topic” ,分區(qū)數(shù)量為2。具體命令如下:
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic
根據(jù)上面給出的主題,完成如下操作。
(1)編寫(xiě)生產(chǎn)者程序,以通過(guò)唯一標(biāo)識(shí)符UUID作為消息,發(fā)送給主題 “ assign_topic” 。
(2)編寫(xiě)消費(fèi)者程序1,訂閱主題的分區(qū)0,只消費(fèi)分區(qū)0數(shù)據(jù)。
(3)編寫(xiě)消費(fèi)者程序2,訂閱主題的分區(qū)1,只消費(fèi)分區(qū)1數(shù)據(jù)。
2.在終端操作
首先要完成主題以及分區(qū)的創(chuàng)建才能編寫(xiě)程序,不然程序會(huì)報(bào)錯(cuò)
步驟:
- 使用windows+r,在彈窗中輸入cmd打開(kāi)終端
- 在終端中輸入命令,創(chuàng)建主題和分區(qū):
cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic
結(jié)果如下圖(這是我之前已經(jīng)創(chuàng)建好的結(jié)果圖):
3.在PyCharm中操作
-
創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)編寫(xiě)生產(chǎn)者程序,以通過(guò)唯一標(biāo)識(shí)符UUID作為消息,發(fā)送給主題 “ assign_topic的功能
:
from kafka import KafkaProducer
import uuid
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for i in range(5):
message = str(uuid.uuid4()).encode('utf-8')
producer.send('assign_topic', value=message)
producer.close()
運(yùn)行結(jié)果如下圖所示:
-
創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)訂閱主題的分區(qū)0,只消費(fèi)分區(qū)0數(shù)據(jù)的功能
:
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 0)])
for message in consumer:
print("Partition 0 - Message value: {}".format(message.value))
consumer.close()
運(yùn)行結(jié)果如下圖所示:
-
創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)訂閱主題的分區(qū)1,只消費(fèi)分區(qū)1數(shù)據(jù)的功能
:
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 1)])
for message in consumer:
print("Partition 1 - Message value: {}".format(message.value))
consumer.close()
運(yùn)行結(jié)果如下圖所示:
三、實(shí)驗(yàn)小bug
1. Kafka連接報(bào)錯(cuò):kafka.errors.NoBrokersAvailable: NoBrokersAvailable 是什么原因:?
答:是因?yàn)槌绦蜻\(yùn)行了多次的原因
把tmp文件和logs文件里面的東西都刪掉,就可以解決了2. 為什么消費(fèi)者程序1中有東西輸出而消費(fèi)者程序2中什么卻什么也沒(méi)輸出?
消費(fèi)者程序1和消費(fèi)者程序2是對(duì)同一個(gè)主題的兩個(gè)消費(fèi)者應(yīng)用程序??梢葬槍?duì)以下情況進(jìn)行分析。
在主題 assign_topic 中,Kafka有多個(gè)分區(qū),可用于并行處理消息。在這里,被消費(fèi)的消息都來(lái)自此主題的第一個(gè)分區(qū)(即分區(qū) 0)。
消費(fèi)者程序1使用了 .subscribe() 方法來(lái)訂閱主題,這將導(dǎo)致消費(fèi)者加入到消費(fèi)組中,然后通過(guò)負(fù)載均衡策略從所有分區(qū)接收消息。因此,消費(fèi)者程序1輸出打印了分區(qū) 0 中的消息。
消費(fèi)者程序2使用了 .assign() 方法手動(dòng)分配消費(fèi)者處理的分區(qū),而且只分配了主題 assign_topic 的第一個(gè)分區(qū)(即分區(qū) 0)。但是,由于該程序沒(méi)有運(yùn)行足夠長(zhǎng)的時(shí)間,并且沒(méi)有消費(fèi)到任何未提交的偏移量,所以當(dāng)應(yīng)用程序終止時(shí)不會(huì)向Kafka服務(wù)器發(fā)送任何提交請(qǐng)求,這就可能導(dǎo)致在下一次啟動(dòng)時(shí)重復(fù)消費(fèi)確認(rèn)過(guò)的消息。因此,在生產(chǎn)環(huán)境中,請(qǐng)務(wù)必根據(jù)具體情況定期地提交所消費(fèi)的分區(qū)的偏移量。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-732112.html
總結(jié)
以上就是對(duì)Kafka的基本使用方法的實(shí)驗(yàn)啦,有不明白的地方可以留言哦,希望能共同進(jìn)步~~????????????文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-732112.html
到了這里,關(guān)于【超級(jí)詳細(xì)】熟悉Kafka的基本使用方法的實(shí)驗(yàn)【W(wǎng)indows】的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!