国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【超級(jí)詳細(xì)】熟悉Kafka的基本使用方法的實(shí)驗(yàn)【W(wǎng)indows】

這篇具有很好參考價(jià)值的文章主要介紹了【超級(jí)詳細(xì)】熟悉Kafka的基本使用方法的實(shí)驗(yàn)【W(wǎng)indows】。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。


前言

Kafka 是由 Apache 軟件基金會(huì)開(kāi)發(fā)的一個(gè)開(kāi)源消息隊(duì)列平臺(tái),它是一種高性能、可擴(kuò)展、分布式的發(fā)布-訂閱消息系統(tǒng)。Kafka 的架構(gòu)被設(shè)計(jì)為高效、低延遲,并具有高吞吐量、持久性和可靠性。

在 Kafka 中,生產(chǎn)者將消息發(fā)布到主題(topic)中,消費(fèi)者則從主題中消費(fèi)消息,使用者可以將其看作一個(gè) highly scalable 分布式 commit log 或者消息系統(tǒng) (Messaging system),每個(gè)消息包含一個(gè) key,一個(gè) value 和一個(gè)額外的 timestamp。消息保留時(shí)間通過(guò)配置進(jìn)行控制,當(dāng)時(shí)間或空間滿了的時(shí)候就根據(jù)策略來(lái)清除老數(shù)據(jù),默認(rèn)情況下老數(shù)據(jù)只保存 7 天。

特點(diǎn):
1.高吞吐量:Kafka 在發(fā)布-訂閱消息方面具有非常高的性能。它可以幾乎實(shí)時(shí)地處理高速流入的大量數(shù)據(jù)。
實(shí)時(shí)處理:Kafka 能夠處理高達(dá)數(shù)以百萬(wàn)計(jì)的消息,并準(zhǔn)確地將消息排序和在群組內(nèi)進(jìn)行調(diào)度。
2.持久性和可靠性:與傳統(tǒng)的消息系統(tǒng)不同,Kafka 具有持久性和可靠性??蛻舳俗约禾峤划?dāng)前偏移量,避免了可能出現(xiàn)的重復(fù)讀取問(wèn)題。
3.可擴(kuò)展性:Kafka 可以在不繁瑣的配置或修改信息格式等環(huán)節(jié)就能進(jìn)行擴(kuò)展。
4.多樣化數(shù)據(jù)類(lèi)型和來(lái)源:通過(guò)使用支持多種編程語(yǔ)言和操作系統(tǒng)的 API,Kafka 可以連接到許多各種來(lái)源的應(yīng)用程序。

總之,Kafka 具有高性能、低時(shí)延,適合處理大規(guī)模物聯(lián)網(wǎng)設(shè)備、日志、報(bào)警信息、傳感器數(shù)據(jù)、消息等。

所以今天就來(lái)寫(xiě)一份關(guān)于熟悉Kafka的基本使用方法的實(shí)驗(yàn),希望可以與小伙伴們一起探討~~????


一、實(shí)驗(yàn)平臺(tái)

(1)操作系統(tǒng):Windows7及以上(我用的是Windows 11)
(2)Kafka版本:kafka_2.12-2.4.0
(3)MySQL版本:8.0

二、實(shí)驗(yàn)內(nèi)容

一、Kafka與MySQL的組合使用

1.實(shí)驗(yàn)要求

假設(shè)有一個(gè)學(xué)生表student,如下表所示,編寫(xiě)Python程序完成如下操作。
(1)讀取student表的數(shù)據(jù)內(nèi)容,將其轉(zhuǎn)換為JSON格式,發(fā)送給Kafka
(2)從Kafka中獲取JSON格式數(shù)據(jù),打印出來(lái)

sno sname ssex sage
95001 John M 23
95002 Tom M 23

2.在MySQL中操作

(1)打開(kāi)MySQL
方式一:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
方式二:

可以通過(guò) DOS 命令啟動(dòng) MySQL 服務(wù),windows+R,在搜索框中輸入cmd,進(jìn)去之后再輸入services.msc,就進(jìn)去服務(wù)系統(tǒng)里了,再啟動(dòng)就行
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
進(jìn)去以后輸入密碼就可以開(kāi)始執(zhí)行mysql語(yǔ)句了
(2)創(chuàng)建數(shù)據(jù)庫(kù)

create database school001;

(3)查看數(shù)據(jù)庫(kù)

show databases;

發(fā)現(xiàn)數(shù)據(jù)庫(kù)已經(jīng)被創(chuàng)建完成
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
(4)使用該數(shù)據(jù)庫(kù)

use school001;

(5)在該數(shù)據(jù)庫(kù)中創(chuàng)建student表

create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));

(6)查詢?cè)摂?shù)據(jù)庫(kù)中的student表

show tables;

(7)向student表中插入值

insert into student values("95001","John","M",23);
insert into student values("95002","Tom","M",23);

(8)查詢student表中的數(shù)據(jù)

select * from student;

查詢結(jié)果:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
(到這里我們的student表就創(chuàng)建成功了!)????

3.安裝Kafka

簡(jiǎn)單介紹:
Kafka 的運(yùn)行需要 Java 環(huán)境的支持,因此,安裝 Kafka 前需要在 Windows 操作系統(tǒng)中安裝 JDK
訪問(wèn) Kafka 官網(wǎng),下載 Kafka 2.4.0的安裝文件 kafka 2.12-2.4.0.1gz,解壓縮到" C : \ "目錄下(也可以放到D盤(pán),不過(guò)最好放在D盤(pán)根目錄下,不然后續(xù)代碼容易報(bào)錯(cuò),我試過(guò))
因?yàn)?Katka 的運(yùn)行依賴(lài)于 Zookeeper ,因此,還需要下載并安裝 Zookeeper 。當(dāng)然, Kafka 也內(nèi)置了 Zookeeper 服務(wù),因此,也可以不額外安裝 Zookeeper ,直接使用內(nèi)置的Zookeeper 服務(wù)。為簡(jiǎn)單起見(jiàn),這里直接使用內(nèi)置的Zookeeper 服務(wù)。

win+r—>輸入cmd然后回車(chē)

輸入命令pip install kafka-python安裝python-kafka模塊

查看我們安裝的模塊的版本信息(出現(xiàn)kafka-python2.0.2表示我們安裝模塊成功)

具體怎么安裝可參考:kafka安裝部署

4.使用Kafka

在實(shí)驗(yàn)中要用到Kafka就要先啟動(dòng)它的Zookeeper服務(wù)和Kafka,且在實(shí)驗(yàn)過(guò)程中,千萬(wàn)不可以將其關(guān)閉,一旦關(guān)閉,服務(wù)就會(huì)停止????
在 Windows 操作系統(tǒng)中
打開(kāi)第1個(gè) cmd 命令行窗口,啟動(dòng) Zookeeper 服務(wù)

cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

注意,執(zhí)行上面的命令以后, cmd 命令行窗口中會(huì)返回一堆信息,然后停住不動(dòng),沒(méi)有回到命令提示符狀態(tài)。這時(shí),不要誤以為是死機(jī),這表示 Zookeeper 服務(wù)器已經(jīng)啟動(dòng),正處于服務(wù)狀態(tài)。所以,不要關(guān)閉這個(gè) cmd 命令行窗口,一旦關(guān)閉, Zookeeper 服務(wù)就會(huì)停止
如圖:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
打開(kāi)第2個(gè) cmd 命令行窗口,然后輸入如下命令啟動(dòng) Kafka 服務(wù)

cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-server-start.bat .\config\server.Properties

同樣地,執(zhí)行上面的命令以后, cmd 命令行窗口中會(huì)返回一堆信息,然后停住不動(dòng),沒(méi)有回到命令提示符狀態(tài)。這時(shí),不要誤以為是死機(jī),這表示 Kafka 服務(wù)器已經(jīng)啟動(dòng),正處于服務(wù)狀態(tài)。所以,不要關(guān)閉這個(gè) cmd 命令行窗口,一旦關(guān)閉, Kafka 服務(wù)就會(huì)停止
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
若執(zhí)行上面的命令以后,如果啟動(dòng)失敗,并且出現(xiàn)提示信息"此時(shí)不應(yīng)有\(zhòng)QuickTime\QTSstem\QTJava.zip ",則需要把環(huán)境變量 CLASSPATH 的相關(guān)信息刪除。具體方法是,

右鍵單擊"計(jì)算機(jī)",再單擊"屬性"一"高級(jí)系統(tǒng)設(shè)置"一"環(huán)境變量",然后,找到變量 CLASSPATH ,把類(lèi)似下面的信息刪除:
C : Program Files (x86) QuickTime\QTSystem QTJava . zip

然后重新啟動(dòng)計(jì)算機(jī),讓配置修改生效。重新啟動(dòng)計(jì)算機(jī)以后,再次按照上面的方法啟動(dòng)Zookeeper和Kafka

為了測(cè)試 Kafka ,這里創(chuàng)建一個(gè)主題,名稱(chēng)為" topic_test ",其包含一個(gè)分區(qū),只有一個(gè)副本。在第3個(gè) cmd 命令行窗口中執(zhí)行如下命令

cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-topics.bat -- create -- zookeeper localhost:2181-- replication -
 factor 1-- partitions 1-- topic topic_test 

kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
可以繼續(xù)執(zhí)行如下命令,查看 topic _ test 是否創(chuàng)建成功:

.\bin\windows\kafka-topics.bat -- list -- zookeeper localhost:2181

kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
如果創(chuàng)建成功,就可以在執(zhí)行結(jié)果中看到 topic _ test
繼續(xù)在第3個(gè) cmd 命令行窗口中執(zhí)行如下命令,創(chuàng)建一個(gè)生產(chǎn)者來(lái)產(chǎn)生消息

.\bin\windows\kafka-console-producer.bat -- broker-list localhost :9092 -topic topic_test 

該命令執(zhí)行以后,屏幕上的光標(biāo)會(huì)持續(xù)閃爍,這時(shí),可以用鍵盤(pán)輸入一些內(nèi)容,例如:
I love Kafka
Kafka is good
新建第4個(gè) cmd 命令行窗口,執(zhí)行如下命令來(lái)消費(fèi)消息

cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic_test --from-beginning 

該命令執(zhí)行以后,屏幕上顯示剛才輸入的語(yǔ)句" I love Kafka “和” Kafka is good "

5.在PyCharm中操作

  1. 創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)讀取student表的數(shù)據(jù)內(nèi)容,將其轉(zhuǎn)換為JSON格式,發(fā)送給Kafka的功能
# 運(yùn)行前先在win上啟動(dòng)zookeap和kafka
# 導(dǎo)入相關(guān)模塊
from kafka import KafkaProducer
import json

# 連接kafka  json.dumps(v).encode('utf-8')將json格式的數(shù)摳轉(zhuǎn)挨為字節(jié)類(lèi)型,然后使用ut了-8進(jìn)行編碼
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 定義一個(gè)json格式的數(shù)第,json格式以鍵值對(duì)形式保存數(shù)掂,每個(gè)鍵值對(duì)之間使用逗號(hào)隔開(kāi)
data = {
    'sno': '95001',
    'sname': 'John',
    'ssex': 'M',
    'sage': 23
}
# 發(fā)送數(shù)據(jù)
producer.send('test001', data)
# 關(guān)閉資源
producer.close()

運(yùn)行結(jié)果如下圖所示:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

  1. 創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)從Kafka中獲取JSON格式數(shù)據(jù),打印出來(lái)的功能
# 運(yùn)行前先在win上啟動(dòng)mysql
# 導(dǎo)入消費(fèi)模塊
import json
# 導(dǎo)入kafka的消費(fèi)模塊
from kafka import KafkaConsumer
import json
import pymysql.cursors

# 連接kafka
consumer = KafkaConsumer('test001', bootstrap_servers='localhost:9092', group_id=None, auto_offset_reset='earliest')
# 對(duì)獲取的數(shù)據(jù)進(jìn)行解析
for msg in consumer:
    # 轉(zhuǎn)換為字符串類(lèi)型
    msg1 = str(msg.value, encoding=('utf-8'))
    # 將字符串的數(shù)據(jù)加載為字典
    dict = json.loads(msg1)
    # 連接數(shù)據(jù)庫(kù)
    connect = pymysql.Connect(
        host='localhost',
        port=3306,
        user='root',
        passwd='xxxxxxxx',#這是你MySQL數(shù)據(jù)庫(kù)的密碼
        db='school001',
        charset='utf8'
    )
    # 獲取操作數(shù)摳庫(kù)的對(duì)象<游標(biāo)>
    cursor = connect.cursor()
    # 將數(shù)摳織存到mysqL(插入數(shù)擲)
    # 定義sql語(yǔ)句
    sql = "select * from student;"
    # 將數(shù)據(jù)作為參數(shù)傳速給sqL,保存到hrgsql
    cursor.execute(sql)
    # 提交
    connect.commit()
    for row in cursor.fetchall():
        print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
    print("共查詢出", cursor.rowcount, '條數(shù)據(jù)')
    connect.close()

運(yùn)行結(jié)果如下圖所示:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

二、消費(fèi)者手動(dòng)提交

1.實(shí)驗(yàn)要求

生成一個(gè)data.json文件,內(nèi)容如下:
data = [
{“name”: “Tony”, “age”: 21, “hobbies”: [“basketball”, “tennis”]},
{“name”: “Lisa”, “age”: 20, “hobbies”: [“sing”, “dance”]},
]
根據(jù)上面給出的data.json文件,執(zhí)行如下操作。
(1)編寫(xiě)生產(chǎn)者程序,將JSON文件數(shù)據(jù)發(fā)送給Kafka。
(2)編寫(xiě)消費(fèi)者程序,讀取Kafka的JSON格式數(shù)據(jù),并手動(dòng)提交偏移量。

2.在PyCharm中操作

  1. 創(chuàng)建一個(gè)Test寫(xiě)入以下代碼,來(lái)實(shí)現(xiàn)生成data.json文件的功能:
import json

data = [
    {"name": "Tony", "age": 21, "hobbies": ["basketball", "tennis"]},
    {"name": "Lisa", "age": 20, "hobbies": ["sing", "dance"]},
]

with open('../../data.json', 'w') as f:
    json.dump(data, f)
  1. 創(chuàng)建一個(gè).py文件,編寫(xiě)生產(chǎn)者程序,來(lái)實(shí)現(xiàn)將JSON文件數(shù)據(jù)發(fā)送給Kafka的功能
# 可以使用 Python 的 json 模塊讀取 data.json 文件,并將數(shù)據(jù)轉(zhuǎn)換為字符串后發(fā)送給 Kafka

from kafka import KafkaProducer
import json

data = [
    {
        "name": "Tony",
        "age": 21,
        "hobbies": ["basketball", "tennis"]
    },
    {
        "name": "Lisa",
        "age": 20,
        "hobbies": ["sing", "dance"]
    }
]

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for item in data:
    # 將數(shù)據(jù)轉(zhuǎn)換為字符串格式并發(fā)送給 Kafka 主題 test
    message = json.dumps(item).encode('utf-8')
    producer.send('test', value=message)

producer.close()

運(yùn)行結(jié)果如下圖所示:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

  1. 創(chuàng)建一個(gè).py文件,編寫(xiě)消費(fèi)者程序,來(lái)實(shí)現(xiàn)讀取Kafka的JSON格式數(shù)據(jù),并手動(dòng)提交偏移量的功能
# 我們可以使用 Kafka 消費(fèi)者 API 進(jìn)行數(shù)據(jù)消費(fèi),并在處理完每個(gè)消息后手動(dòng)提交偏移量。

from kafka import KafkaConsumer
import json

# 配置 Kafka 消費(fèi)者,指定主題和分組等信息
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # 禁止自動(dòng)提交偏移量
    group_id='my-group')

# 循環(huán)消費(fèi) Kafka 消息
for message in consumer:
    # 將傳入的二進(jìn)制消息內(nèi)容解碼為 JSON 格式的字符串
    item = json.loads(message.value.decode('utf-8'))
    print(item)

    # 手動(dòng)提交偏移量,確保下次消費(fèi)時(shí)從正確的位置開(kāi)始
    consumer.commit()

運(yùn)行結(jié)果如下圖所示:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

三、Kafka消費(fèi)者訂閱分區(qū)

1.實(shí)驗(yàn)要求

在命令行窗口中啟動(dòng)Kafka后,手動(dòng)創(chuàng)建主題 “assign_topic” ,分區(qū)數(shù)量為2。具體命令如下:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

根據(jù)上面給出的主題,完成如下操作。
(1)編寫(xiě)生產(chǎn)者程序,以通過(guò)唯一標(biāo)識(shí)符UUID作為消息,發(fā)送給主題 “ assign_topic” 。
(2)編寫(xiě)消費(fèi)者程序1,訂閱主題的分區(qū)0,只消費(fèi)分區(qū)0數(shù)據(jù)。
(3)編寫(xiě)消費(fèi)者程序2,訂閱主題的分區(qū)1,只消費(fèi)分區(qū)1數(shù)據(jù)。

2.在終端操作

首先要完成主題以及分區(qū)的創(chuàng)建才能編寫(xiě)程序,不然程序會(huì)報(bào)錯(cuò)
步驟:

  1. 使用windows+r,在彈窗中輸入cmd打開(kāi)終端
  2. 在終端中輸入命令,創(chuàng)建主題和分區(qū):
cd D:\kafka_2.12-2.4.0(這個(gè)是你安裝kafka的路徑)
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

結(jié)果如下圖(這是我之前已經(jīng)創(chuàng)建好的結(jié)果圖):kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

3.在PyCharm中操作

  1. 創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)編寫(xiě)生產(chǎn)者程序,以通過(guò)唯一標(biāo)識(shí)符UUID作為消息,發(fā)送給主題 “ assign_topic的功能:
from kafka import KafkaProducer
import uuid

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for i in range(5):
    message = str(uuid.uuid4()).encode('utf-8')
    producer.send('assign_topic', value=message)

producer.close()

運(yùn)行結(jié)果如下圖所示:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

  1. 創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)訂閱主題的分區(qū)0,只消費(fèi)分區(qū)0數(shù)據(jù)的功能:
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 0)])

for message in consumer:
    print("Partition 0 - Message value: {}".format(message.value))

consumer.close()

運(yùn)行結(jié)果如下圖所示:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

  1. 創(chuàng)建一個(gè).py文件,寫(xiě)入以下代碼,用于實(shí)現(xiàn)訂閱主題的分區(qū)1,只消費(fèi)分區(qū)1數(shù)據(jù)的功能:
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 1)])

for message in consumer:
    print("Partition 1 - Message value: {}".format(message.value))

consumer.close()

運(yùn)行結(jié)果如下圖所示:
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言

三、實(shí)驗(yàn)小bug

1. Kafka連接報(bào)錯(cuò):kafka.errors.NoBrokersAvailable: NoBrokersAvailable 是什么原因:?
答:是因?yàn)槌绦蜻\(yùn)行了多次的原因
把tmp文件和logs文件里面的東西都刪掉,就可以解決了
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
kafka windows,python,kafka,python,人工智能,大數(shù)據(jù),開(kāi)發(fā)語(yǔ)言
2. 為什么消費(fèi)者程序1中有東西輸出而消費(fèi)者程序2中什么卻什么也沒(méi)輸出?

消費(fèi)者程序1和消費(fèi)者程序2是對(duì)同一個(gè)主題的兩個(gè)消費(fèi)者應(yīng)用程序??梢葬槍?duì)以下情況進(jìn)行分析。
在主題 assign_topic 中,Kafka有多個(gè)分區(qū),可用于并行處理消息。在這里,被消費(fèi)的消息都來(lái)自此主題的第一個(gè)分區(qū)(即分區(qū) 0)。
消費(fèi)者程序1使用了 .subscribe() 方法來(lái)訂閱主題,這將導(dǎo)致消費(fèi)者加入到消費(fèi)組中,然后通過(guò)負(fù)載均衡策略從所有分區(qū)接收消息。因此,消費(fèi)者程序1輸出打印了分區(qū) 0 中的消息。
消費(fèi)者程序2使用了 .assign() 方法手動(dòng)分配消費(fèi)者處理的分區(qū),而且只分配了主題 assign_topic 的第一個(gè)分區(qū)(即分區(qū) 0)。但是,由于該程序沒(méi)有運(yùn)行足夠長(zhǎng)的時(shí)間,并且沒(méi)有消費(fèi)到任何未提交的偏移量,所以當(dāng)應(yīng)用程序終止時(shí)不會(huì)向Kafka服務(wù)器發(fā)送任何提交請(qǐng)求,這就可能導(dǎo)致在下一次啟動(dòng)時(shí)重復(fù)消費(fèi)確認(rèn)過(guò)的消息。因此,在生產(chǎn)環(huán)境中,請(qǐng)務(wù)必根據(jù)具體情況定期地提交所消費(fèi)的分區(qū)的偏移量。


總結(jié)

以上就是對(duì)Kafka的基本使用方法的實(shí)驗(yàn)啦,有不明白的地方可以留言哦,希望能共同進(jìn)步~~????????????文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-732112.html

到了這里,關(guān)于【超級(jí)詳細(xì)】熟悉Kafka的基本使用方法的實(shí)驗(yàn)【W(wǎng)indows】的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • c#客戶端Kafka的使用方法

    c#客戶端Kafka的使用方法

    Apache Kafka是一個(gè)分布式流處理平臺(tái),最初由LinkedIn開(kāi)發(fā),現(xiàn)在是Apache軟件基金會(huì)的頂級(jí)項(xiàng)目之一。Kafka能夠處理大規(guī)模的實(shí)時(shí)數(shù)據(jù)流,支持高可靠性、高可擴(kuò)展性、低延遲和高吞吐量。它主要用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流式處理應(yīng)用程序。 Kafka的核心概念包括:Producer(生產(chǎn)者)

    2024年02月12日
    瀏覽(22)
  • docker基本使用方法

    docker基本使用方法

    Docker 可以讓開(kāi)發(fā)者打包他們的應(yīng)用以及依賴(lài)包到一個(gè)輕量級(jí)、可移植的容器中,然后發(fā)布到任何流行的 Linux 機(jī)器上,也可以實(shí)現(xiàn)虛擬化。Docker 使您能夠?qū)?yīng)用程序與基礎(chǔ)架構(gòu)分開(kāi),從而可以快速交付軟件。通過(guò)利用 Docker 的方法來(lái)快速交付,測(cè)試和部署代碼,您可以大大減

    2024年02月13日
    瀏覽(26)
  • Wireshark基本使用方法

    Wireshark基本使用方法

    目錄 1、Wireshark介紹 1.1?Wireshark使用 1.2?支持的協(xié)議 2.Wireshark主要應(yīng)用 3.Wireshark安裝 ?4.Wireshark頁(yè)面介紹 4.1?分組列表 ?4.2?分組詳情 ?4.3?分組字節(jié)流 ?5.Wireshark導(dǎo)航 5.1?開(kāi)始捕獲分組 5.2?停止捕獲分組 5.3 重新開(kāi)始當(dāng)前捕獲 5.4、捕獲選項(xiàng) 5.5?打開(kāi)以保存的捕獲文件 5.6?保存捕

    2024年02月13日
    瀏覽(20)
  • vim基本使用方法

    vim是linux上一個(gè)有多個(gè)編輯模式的編輯器。 這里主要介紹三種模式: 命令模式(Normal mode) 執(zhí)行命令的模式,主要任務(wù)就是控制光標(biāo)移動(dòng)、復(fù)制和刪除。 插入模式(Insert mode) 可以進(jìn)行文字輸入,編寫(xiě)代碼模式。 末行/底行模式(last line mode) 文件保存退出,文本替換、列出

    2024年02月12日
    瀏覽(19)
  • uCharts基本使用方法

    uCharts基本使用方法

    首先下載ucharts文件 https://gitee.com/uCharts/uCharts 下載下來(lái)看到有這些文件,小伙伴們可以先去示例項(xiàng)目里面看 引入u-charts.js文件,主要構(gòu)建就是new uCharts和配置context,其他的就跟其他charts配置一樣 可以看例子寫(xiě)的,也可以自己試驗(yàn)一波 方法寫(xiě)入兩種方式 第一種方式 ucharts下載

    2024年02月04日
    瀏覽(19)
  • pandas庫(kù)基本使用方法

    Pandas是Python中一個(gè)非常流行的數(shù)據(jù)處理庫(kù),它提供了一些強(qiáng)大的數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)分析工具,可以幫助我們更方便、快捷地處理數(shù)據(jù)。下面我們來(lái)介紹一下Pandas的使用方法。 1.導(dǎo)入Pandas庫(kù) 在使用Pandas之前,需要先導(dǎo)入Pandas庫(kù)。通常的做法是使用import語(yǔ)句導(dǎo)入Pandas庫(kù),并給它起一

    2024年02月10日
    瀏覽(31)
  • anaconda 創(chuàng)建虛擬環(huán)境、激活,使用的基本方法及安裝包的基本方法

    anaconda 創(chuàng)建虛擬環(huán)境、激活,使用的基本方法及安裝包的基本方法

    第一步 打開(kāi)Anaconda Prompt 可以看到這里是base環(huán)境。 第二步 我們現(xiàn)在要?jiǎng)?chuàng)建一個(gè)新的虛擬環(huán)境,名叫test,且python版本為3.8 在安裝過(guò)程中會(huì)出現(xiàn)下面這個(gè)選項(xiàng),輸入y就好了 創(chuàng)建成功如下圖所示!hiahia! 我們已經(jīng)學(xué)會(huì)如何創(chuàng)建新的環(huán)境了!沒(méi)錯(cuò)!我們非常棒!下面我們就看看,

    2024年03月14日
    瀏覽(22)
  • java反射的基本使用方法

    當(dāng)我們使用 Java 開(kāi)發(fā)時(shí),有時(shí)需要獲取某個(gè)類(lèi)的信息,例如類(lèi)的屬性、方法和構(gòu)造函數(shù)等,然后在程序運(yùn)行期間動(dòng)態(tài)地操作它們。Java 反射就是用來(lái)實(shí)現(xiàn)這個(gè)目的的一種機(jī)制。 Java 反射(Reflection)是 Java 編程語(yǔ)言在運(yùn)行時(shí)動(dòng)態(tài)獲取類(lèi)的信息以及動(dòng)態(tài)調(diào)用對(duì)象方法的能力。它可

    2024年02月16日
    瀏覽(25)
  • Loguru基本、進(jìn)階使用方法小結(jié)。

    Loguru基本、進(jìn)階使用方法小結(jié)。

    loguru是一個(gè)開(kāi)源的Python日志記錄器,它提供了簡(jiǎn)單且易于使用的接口,同時(shí)具有高度的可定制性。loguru的特點(diǎn)包括:支持格式化日志、記錄到文件或終端、支持自動(dòng)清理日志、支持旋轉(zhuǎn)日志等。 loguru的基本使用方法非常簡(jiǎn)單,只需要導(dǎo)入loguru模塊,并使用logger函數(shù)創(chuàng)建一個(gè)日

    2024年02月16日
    瀏覽(33)
  • Pytorch基本概念和使用方法

    Pytorch基本概念和使用方法

    目錄 1 Adam及優(yōu)化器optimizer(Adam、SGD等)是如何選用的? 1)Momentum 2)RMSProp 3)Adam 2 Pytorch的使用以及Pytorch在以后學(xué)習(xí)工作中的應(yīng)用場(chǎng)景。 1)Pytorch的使用 2)應(yīng)用場(chǎng)景 3 不同的數(shù)據(jù)、數(shù)據(jù)集加載方式以及加載后各部分的調(diào)用處理方式。如DataLoder的使用、datasets內(nèi)置數(shù)據(jù)集的使

    2024年02月07日
    瀏覽(27)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包