docker搭建kafka集群完整版(windows)
1.安裝docker desktop.
打開docker官網(wǎng),下載docker desktop,這里直接給出網(wǎng)址:Install Docker Desktop on Windows | Docker Docs
如下圖,點(diǎn)擊下載即可。
下載好后 點(diǎn)擊運(yùn)行exe文件,我們采用交互式安裝程序。
安裝完成后直接重啟即可,默認(rèn)安裝在c盤,如果不想安裝在c盤就采用命令行的方式安裝。官網(wǎng)有教程。
點(diǎn)擊接受
之后點(diǎn)擊登錄
當(dāng)然不登錄也沒關(guān)系。
接下來我們安裝一下Linux內(nèi)核,打開windows powershell,運(yùn)行wsl --date,即可(看情況,電腦沒有或軟件沒有提示的情況就要安裝)
配置一下環(huán)境,如下,打開右上角的設(shè)置,更改下面的數(shù)據(jù)位置
之后配置國(guó)內(nèi)鏡像源,可用參考網(wǎng)上給的代碼
{
"registry-mirrors": [
"https://registry.docker-cn.com",
"http://hub-mirror.c.163.com",
"https://docker.mirrors.ustc.edu.cn",
"https://cr.console.aliyun.com",
"https://mirror.ccs.tencentyun.com"
],
"builder": {
"gc": {
"defaultKeepStorage": "20GB",
"enabled": true
}
},
"experimental": false,
"features": {
"buildkit": true
}
}
之后點(diǎn)擊應(yīng)用并重啟即可。
過程中可能出現(xiàn)的問題:
1.Docker 一直starting
遇到這種情況,一般是因?yàn)闆]有安裝wsl 2(或者沒有打開),安裝即可。在安裝這個(gè)之前需要啟用虛擬化,一般都開啟了,這里不詳細(xì)介紹。
安裝完成后重啟電腦即可。
如下圖,便是安裝完成了
上面有一個(gè)是我拉取的一個(gè)image,一開始沒有。
這樣我們便可以在windows上使用docker了。
注意:后續(xù)如果關(guān)閉后一直顯示正在啟動(dòng)中建議重啟電腦重新啟動(dòng),親測(cè)有效。
2.下載鏡像
打開Windows powershell,運(yùn)行下面命令:
-
docker pull bitnami/kafka
-
docker pull zookeeper
版本隨意,但建議都采用最新的版本,老版本可能會(huì)出現(xiàn)版本沖突,但你不知道會(huì)不會(huì)發(fā)生沖突,容易出問題。
準(zhǔn)備工作:
在開始新建集群之前,新建好文件夾,根據(jù)下面的yml配置文件選擇的地址來建文件夾(冒號(hào)后面的可以不建),如下圖(可以自己改變位置):
不然數(shù)據(jù)會(huì)默認(rèn)安裝到C盤。
3.創(chuàng)建docker網(wǎng)絡(luò)
? 運(yùn)行下面命令:
docker network create zk-net
如下圖:
4.docker compose 搭建kafka集群
之前還要搭建zookeeper集群,用了倆個(gè)compose.yml文件,這里合并一下,縮減操作。
創(chuàng)建一個(gè)yml文件,名字隨意,這里取名為docker-compose-kafka.yml
文件配置如下:
version: "3"
networks:
zk-net:
external:
name: zk-net
services:
z1:
image: 'zookeeper:latest'
container_name: z1
hostname: z1
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=z2:2888:3888;2181 server.3=z3:2888:3888;2181
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- zk-net
ports:
- 2181:2181
- 8081:8080
volumes:
- /D/docker_desktop/z1/z1/data:/data
- /D/docker_desktop/z1/z1/datalog:/datalog
z2:
image: 'zookeeper:latest'
container_name: z2
hostname: z2
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=z1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=z3:2888:3888;2181
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- zk-net
ports:
- 2182:2181
- 8082:8080
volumes:
- /D/docker_desktop/z1/z2/data:/data
- /D/docker_desktop/z1/z2/datalog:/datalog
z3:
image: 'zookeeper:latest'
container_name: z3
hostname: z3
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=z1:2888:3888;2181 server.2=z2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
ALLOW_ANONYMOUS_LOGIN: "yes"
networks:
- zk-net
ports:
- 2183:2181
- 8083:8080
volumes:
- /D/docker_desktop/z1/z3/data:/data
- /D/docker_desktop/z1/z3/datalog:/datalog
kafka1:
image: 'bitnami/kafka:latest'
restart: always
container_name: kafka1
hostname: kafka1
ports:
- '9092:9092'
environment:
- ALLOW_NONE_AUTHENTICATION=yes
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181
volumes:
- /D/docker_desktop/k1/kafka1:/bitnami/kafka
networks:
- zk-net
kafka2:
image: 'bitnami/kafka:latest'
restart: always
container_name: kafka2
hostname: kafka2
ports:
- '9093:9093'
environment:
- ALLOW_NONE_AUTHENTICATION=yes
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=2
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093
- KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181
volumes:
- /D/docker_desktop/k1/kafka2:/bitnami/kafka
networks:
- zk-net
kafka3:
image: 'bitnami/kafka:latest'
restart: always
container_name: kafka3
hostname: kafka3
ports:
- '9094:9094'
environment:
- ALLOW_NONE_AUTHENTICATION=yes
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=3
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9094
- KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181
volumes:
- /D/docker_desktop/k1/kafka3:/bitnami/kafka
networks:
- zk-net
5.啟動(dòng)kafka集群
之后在windows powershell上運(yùn)行下面命令:
docker-compose -f D:\docker_desktop\z1\docker-compose_kafka.yml up -d
停止配置文件運(yùn)行代碼如下:(不要運(yùn)行,一般是上面代碼報(bào)錯(cuò)再運(yùn)行停止服務(wù)的)
docker-compose -f D:\docker_desktop\z1\docker-compose_kafka.yml stop
6.使用docker desktop
如下圖,便是docker desktop的一個(gè)界面:
點(diǎn)擊kafka鏡像的狀態(tài)或者左上角的容器,如下圖:
選擇一個(gè)容器進(jìn)入即可:
進(jìn)入后如下圖:
其中,logs代表日志信息,inspect可以查看kafka的配置信息,包括網(wǎng)絡(luò)和集群等信息,如下圖:
exec就是容器內(nèi)部了,可以通過寫指令來操控容器,如圖所示:
容器內(nèi)部其實(shí)就是一個(gè)Linux系統(tǒng)樣的東西,在Files里面可以看到容器的結(jié)構(gòu)。如圖所示:
更多關(guān)于docker desktop的使用請(qǐng)自己摸索或查閱。這里只是簡(jiǎn)單介紹一下方便下面的教學(xué)。
6.創(chuàng)建主題
搭建好集群后我們需要?jiǎng)?chuàng)建一個(gè)主題,來進(jìn)行后面的測(cè)試。
進(jìn)入kafka容器,倆種方式,一種是通過docker desktop進(jìn)入,還有一種是通過命令行的方式進(jìn)入,命令行的方式自己去搜,這里通過docker desktop進(jìn)入,跟上面一樣進(jìn)入一個(gè)kafka點(diǎn)擊exec即可,如下圖:
注意有上角的灰色垃圾箱代表清空桌面,紅色的代表刪除容器。
剛進(jìn)入容器默認(rèn)在/目錄下,我們需要進(jìn)入到kafka的bin目錄下,使用cd命令即可,如下圖:
注意kafka的文件夾在/opt/bitnami/目錄里面。
之后我們通過下面命令創(chuàng)建主題:
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic topic1
-
./kafka-topics.sh --create
: 這一部分告訴Kafka命令行工具你想要?jiǎng)?chuàng)建一個(gè)新的主題。 -
--bootstrap-server localhost:9092
: 這一部分指定了Kafka服務(wù)的地址和端口。在這個(gè)例子中,服務(wù)運(yùn)行在本地主機(jī)的9092端口。 -
--replication-factor 3
: 這一部分指定了主題的副本因子,即數(shù)據(jù)在Kafka集群中的復(fù)制次數(shù)。在這個(gè)例子中,數(shù)據(jù)將被復(fù)制3次。 -
--partitions 3
: 這一部分指定了主題的分區(qū)數(shù)。在Kafka中,數(shù)據(jù)被組織成多個(gè)分區(qū),每個(gè)分區(qū)可以獨(dú)立地處理和存儲(chǔ)。在這個(gè)例子中,主題將有3個(gè)分區(qū)。 -
--topic topic1
: 這一部分指定了要?jiǎng)?chuàng)建的主題名稱。在這個(gè)例子中,主題名稱為"topic1"。
注意:在Kafka中,副本數(shù)不可以大于分區(qū)數(shù)。因?yàn)楦北臼且阅夸洿鎯?chǔ)在各個(gè)broker節(jié)點(diǎn)的data目錄下,如果副本數(shù)量大于broker節(jié)點(diǎn)數(shù)量,那么在同一個(gè)Broker節(jié)點(diǎn)的data目錄下會(huì)有兩個(gè)一樣的文件夾,這是不允許的。
網(wǎng)上的命令都有點(diǎn)老了,可能會(huì)報(bào)錯(cuò),建議用這個(gè)命令,或者用–help查閱怎么使用,如下:
./kafka-topics.sh --help
使用下面命令可以查看主題:
kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic1
如下圖:
7.命令行使用生產(chǎn)者和消費(fèi)者程序
在容器內(nèi)運(yùn)行如下命令,打開消費(fèi)端:
./kafka-console-consumer.sh --from-beginning --topic ysh --bootstrap-server localhost:9092
之后在打開windows powershell進(jìn)入一個(gè)kafka容器打開生產(chǎn)者程序,命令如下:
docker exec -it kafka2 /bin/bash
之后進(jìn)入kafka bin目錄下 ,命令如下:
cd /opt/bitnami/kafka/bin
打開生產(chǎn)者程序,命令如下:
./kafka-console-producer.sh --broker-list localhost:9093 --topic topic1
注意:上面的我端口是9093,不是9092,9092是kafka1的端口,而我進(jìn)入的是kafka2,不然會(huì)報(bào)錯(cuò),要想在所有kafka集群里面都可以直接用9092就需要改一下上面的配置文件yml,如下圖:
將他們的端口都映射到9092就可以了,跟zookeeper一樣,每個(gè)kafka都要改。
之后再生產(chǎn)者端寫入數(shù)據(jù),可以看到消費(fèi)端有數(shù)據(jù)出來,如下圖:
8.kafka Java API 編寫生產(chǎn)者程序和消費(fèi)者程序(以讀取股票信息為例)
在編寫代碼前需要先在C盤,windows/system32/drivers/hosts文件里面將kafka的網(wǎng)絡(luò)添加進(jìn)去,不然idea無法識(shí)別,idea機(jī)制問題,
這樣就沒問題了。
導(dǎo)入依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
編寫生產(chǎn)者程序:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException {
Properties props = new Properties();
//1.指定Kafaka集群的ip地址和端口號(hào)
props.put("bootstrap.servers", "kafka1:9092,kafka2:9093,kafka3:9094");
//2.等待所有副本節(jié)點(diǎn)的應(yīng)答
props.put("acks", "all");
//3.消息發(fā)送最大嘗試次數(shù)
props.put("retries", 0);
//4.指定一批消息處理次數(shù)
props.put("batch.size", 16384);
//5.指定請(qǐng)求延時(shí)
props.put("linger.ms", 1);
//6.指定緩存區(qū)內(nèi)存大小
props.put("buffer.memory", 33554432);
//7.設(shè)置key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//8.設(shè)置value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 9、生產(chǎn)數(shù)據(jù)
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 定義CSV文件路徑
String csvFile = "C:\\Users\\asus\\Desktop\\data\\股票a.csv";
// 讀取CSV文件并發(fā)送到Kafka
try {
//BufferedReader reader = new BufferedReader(new FileReader(csvFile),);
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(csvFile), "GBK"));
String line;
reader.readLine();
while ((line = reader.readLine()) != null) {
String[] data = line.split(","); // 假設(shè)CSV文件使用逗號(hào)分隔
String key = data[1]; // 假設(shè)交易筆數(shù)為關(guān)鍵字
String value = data[0] + "," + data[1] + "," + data[2] + "," + data[3] + "," + data[4] + "," + data[5] + "," + data[6] + "," + data[7] + "," + data[8]; // 假設(shè)交易總量為值,使用逗號(hào)分隔
producer.send(new ProducerRecord<String, String>("ysh",value));
System.out.printf(value+"\n");
}
producer.close(); // 關(guān)閉Kafka生產(chǎn)者
}
catch (IOException e) {
System.out.printf("文件打開失敗");
// 處理IO異常
}
}
}
編寫消費(fèi)者程序:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class KafkaConsumerTest {
public static void main(String[] args) {
//1、準(zhǔn)備配置文件
Properties props = new Properties();
//2、指定kafka集群主機(jī)名和端口號(hào)
//props.put("zookeeper.connect", "localhost:2181");
props.put("bootstrap.servers", "kafka1:9092,kafka2:9093,kafka3:9094");
//3、指定消費(fèi)者組id,在同一時(shí)刻同一消費(fèi)組中只有一個(gè)線程可以
//去消費(fèi)一個(gè)分區(qū)消息,不同的消費(fèi)組可以去消費(fèi)同一個(gè)分區(qū)消息
props.put("group.id", "consumer");
//4、自動(dòng)提交偏移量
props.put("enable.auto.commit", "true");
//5、自動(dòng)提交時(shí)間間隔,每秒提交一次
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset","earliest");
props.put("client.id", "zy_client_id");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//6、訂閱消息,這里的topic可以是多個(gè)
kafkaConsumer.subscribe(Arrays.asList("ysh"));
AtomicInteger count = new AtomicInteger(0); // 原子整數(shù)用于統(tǒng)計(jì)交易筆數(shù)之和
AtomicLong totalAmount = new AtomicLong(0); // 原子長(zhǎng)整型用于統(tǒng)計(jì)交易總量之和
//System.out.printf("yse"); //7、獲取消息
long startTime = System.currentTimeMillis();
while (true) {
//每隔10s拉取一次
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
if (records.isEmpty()) {
// 如果 records 為空,則跳過當(dāng)前循環(huán)
continue;
}
for (ConsumerRecord<String, String> record : records) {
System.out.printf("value=%s%n", record.value());
String value = record.value(); // 獲取消息值(交易總量)
String[] values = value.split(","); // 使用逗號(hào)分隔值(假設(shè)格式為交易筆數(shù),交易總量)
int tradeCount =1; // 解析交易筆數(shù)(關(guān)鍵字)為整數(shù)并加到總和中(假設(shè)第一列是交易筆數(shù))
long tradeAmount = Long.parseLong(values[4]); // 解析交易總量(值)為長(zhǎng)整數(shù)并加到總和中(假設(shè)第二列是交易總量)
totalAmount.addAndGet(tradeAmount); //
count.addAndGet(tradeCount);
}
long endTime = System.currentTimeMillis();
System.out.printf("tradeCount=%d,totalAmount=%d%n",count.get(),totalAmount.get());
System.out.printf("total_time=%d ms %n",endTime-startTime);
}
}
}
先運(yùn)行消費(fèi)者程序,再運(yùn)行生產(chǎn)者程序結(jié)果如下:
測(cè)試完畢,下面進(jìn)行參數(shù)調(diào)優(yōu)和結(jié)果比較。
這里以緩存區(qū)內(nèi)存大小為例:
下面是內(nèi)存大小為335544b時(shí)的運(yùn)行結(jié)果,花費(fèi)時(shí)間為3966ms
下圖為緩存大小為33554b時(shí)的運(yùn)行結(jié)果,花費(fèi)時(shí)間為3952ms
需要注意的是在緩存大小一定的情況下,花費(fèi)時(shí)間也不是固定的,還收網(wǎng)絡(luò)速度等因素的影響,下圖為緩存大小為33554b時(shí)花費(fèi)時(shí)間為3774,較上圖速度明顯減小,但緩存大小未變。
文章來源:http://www.zghlxwxcb.cn/news/detail-785485.html
9.總結(jié)
在window上搭建kafka集群并用java API 的過程中,因?yàn)閷?duì)很多知識(shí)點(diǎn)的不了解,導(dǎo)致過程之中發(fā)生了很多意外,比如如何使用window desktop,如何在windows上面搭建docker,docker如何搭建kafka集群,如何配置網(wǎng)絡(luò)連接,搭建好kafka后如何創(chuàng)建主題,如何查看主題,如何運(yùn)行生產(chǎn)者程序,如何運(yùn)行消費(fèi)者程序,idea如何連接容器內(nèi)的kafka集群,idea無法連接容器kafka集群,消息遺漏等一系列問題。在搭建kafka集群的過程中,雖然遇到了很多問題,但也讓我學(xué)到了很多,包括kafka和docker的一些常用命令,kafka API,消息遺漏,網(wǎng)絡(luò)連接通信的知識(shí)。文章來源地址http://www.zghlxwxcb.cn/news/detail-785485.html
到了這里,關(guān)于docker搭建kafka集群并測(cè)試完整版的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!