第 1 章:Kafka概述
1.1 定義
Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
發(fā)布/訂閱:消息的發(fā)布者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接收感興趣的消息。
1.2 消息隊(duì)列
目前企業(yè)中比較常見的消息隊(duì)列產(chǎn)品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大多數(shù)場(chǎng)景主要采用Kafka作為消息隊(duì)列
在JavaEE開發(fā)中主要采用ActiveMQ、RabbitMQ、RocketMQ
1.2.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景
1、傳統(tǒng)的消費(fèi)隊(duì)列的主要應(yīng)用場(chǎng)景有:緩存/削峰(緩沖)、解耦(少依賴)、異步通信(不必要及時(shí)處理)
1)緩存/削峰(緩沖):有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
2)解耦:允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵循同樣的接口約束。
3)異步通信:允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它,然后再需要的時(shí)候再去處理它們。
1.2.2 消息隊(duì)列的兩種模式
消息隊(duì)列主要分為兩種模式:點(diǎn)對(duì)點(diǎn)模式(一個(gè)生產(chǎn)者對(duì)口一個(gè)消費(fèi)者)和發(fā)布/訂閱模式(一對(duì)多)
1.3 Kafka基礎(chǔ)框架
1、Producer:消息生產(chǎn)者,就是向Kafka broker發(fā)消息的客戶端
2、Consumer:消息消費(fèi)者,向kafka broker獲取消息的客戶端
3、Consumer Group(CG):消費(fèi)者組,由多個(gè)consumer組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)broker可以由多個(gè)不同的topic,一個(gè)topic下的一個(gè)分區(qū)只能被一個(gè)消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者所消費(fèi);消費(fèi)者之間不受影響。消費(fèi)者組是邏輯上的一個(gè)訂閱者。
4、Broker:一個(gè)kafka服務(wù)器就是一個(gè)broker。一個(gè)broker可以容納多個(gè)不同topic
5、Topic:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè)topic
6、Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的topic可以分布到多個(gè)broker(即服務(wù)器)上,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列
7、Replica:副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的partition數(shù)據(jù)不丟失,且kafka仍然能夠繼續(xù)工作,kafka提供了副本機(jī)制,一個(gè)topic的每個(gè)發(fā)你去都有若干個(gè)副本,一個(gè)leader和若干個(gè)follower
8、leader:每個(gè)分區(qū)副本中的”主“,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是leader
9、followeer:每個(gè)分區(qū)副本中的“從”,實(shí)現(xiàn)于leader副本保持同步,在leader發(fā)送故障時(shí),稱為新的leader
第 2 章:Kafka快速入門
2.1 安裝部署
2.1.1 集群部署
2.1.2 集群部署
1、官方下載地址:http://kafka.apache.org/downloads.html
2、上傳安裝包到102的/opt/software目錄下:
[atguigu@hadoop102 software]$ ll
-rw-rw-r--. 1 atguigu atguigu 86486610 3月 10 12:33 kafka_2.12-3.0.0.tgz
3、解壓安裝包到/opt/module/目錄下
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
4、進(jìn)入到/opt/module目錄下,修改解壓包名為kafka
[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0 kafka
5、修改config目錄下的配置文件server.properties內(nèi)容如下
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
#broker的全局唯一編號(hào),不能重復(fù),只能是數(shù)字。
broker.id=102
#處理網(wǎng)絡(luò)請(qǐng)求的線程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤IO的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運(yùn)行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka自動(dòng)幫你創(chuàng)建,可以配置多個(gè)磁盤路徑,路徑與路徑之間可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在當(dāng)前broker上的分區(qū)個(gè)數(shù)
num.partitions=1
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
# 每個(gè)topic創(chuàng)建時(shí)的副本數(shù),默認(rèn)時(shí)1個(gè)副本
offsets.topic.replication.factor=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#每個(gè)segment文件的大小,默認(rèn)最大1G
log.segment.bytes=1073741824
# 檢查過期數(shù)據(jù)的時(shí)間,默認(rèn)5分鐘檢查一次是否數(shù)據(jù)過期
log.retention.check.interval.ms=300000
#配置連接Zookeeper集群地址(在zk根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
6、配置環(huán)境變量
[atguigu@hadoop102 kafka]$ sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[atguigu@hadoop102 kafka]$ source /etc/profile
7、分發(fā)環(huán)境變量文件并source
[atguigu@hadoop102 kafka]$ xsync /etc/profile.d/my_env.sh
==================== hadoop102 ====================
sending incremental file list
sent 47 bytes received 12 bytes 39.33 bytes/sec
total size is 371 speedup is 6.29
==================== hadoop103 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.Sd7MUA" failed: Permission denied (13)
sent 465 bytes received 126 bytes 394.00 bytes/sec
total size is 371 speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2]
==================== hadoop104 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.vb8jRj" failed: Permission denied (13)
sent 465 bytes received 126 bytes 1,182.00 bytes/sec
total size is 371 speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2],
# 這時(shí)你覺得適用sudo就可以了,但是真的是這樣嗎?
[atguigu@hadoop102 kafka]$ sudo xsync /etc/profile.d/my_env.sh
sudo: xsync:找不到命令
# 這時(shí)需要將xsync的命令文件,copy到/usr/bin/下,sudo(root)才能找到xsync命令
[atguigu@hadoop102 kafka]$ sudo cp /home/atguigu/bin/xsync /usr/bin/
[atguigu@hadoop102 kafka]$ sudo xsync /etc/profile.d/my_env.sh
# 在每個(gè)節(jié)點(diǎn)上執(zhí)行source命令,如何你沒有xcall腳本,就手動(dòng)在三臺(tái)節(jié)點(diǎn)上執(zhí)行source命令。
[atguigu@hadoop102 kafka]$ xcall source /etc/profile
8、分發(fā)安裝包
[atguigu@hadoop102 module]$ xsync kafka/
9、修改配置文件的brokerid
分別在hadoop103和104上修改配置文件server.properties中的broker.id=103、broker.id=104
注:broker.id不得重復(fù)
[atguigu@hadoop103 kafka]$ vim config/server.properties
broker.id=103
[atguigu@hadoop104 kafka]$ vim config/server.properties
broker.id=104
10、啟動(dòng)集群
1)先啟動(dòng)Zookeeper集群
[atguigu@hadoop102 kafka]$ zk.sh start
2)一次在102、103、104節(jié)點(diǎn)啟動(dòng)kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
11、關(guān)閉集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh
2.1.4 kafka群起腳本
1、腳本編寫
在/home/atguigu/bin目錄下創(chuàng)建文件kafka.sh腳本文件:
#! /bin/bash
if (($#==0)); then
echo -e "請(qǐng)輸入?yún)?shù):\n start 啟動(dòng)kafka集群;\n stop 停止kafka集群;\n" && exit
fi
case $1 in
"start")
for host in hadoop103 hadoop102 hadoop104
do
echo "---------- $1 $host 的kafka ----------"
ssh $host "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for host in hadoop103 hadoop102 hadoop104
do
echo "---------- $1 $host 的kafka ----------"
ssh $host "/opt/module/kafka/bin/kafka-server-stop.sh"
done
;;
*)
echo -e "---------- 請(qǐng)輸入正確的參數(shù) ----------\n"
echo -e "start 啟動(dòng)kafka集群;\n stop 停止kafka集群;\n" && exit
;;
esac
2、腳本文件添加權(quán)限
[atguigu@hadoop102 bin]$ chmod +x kafka.sh
注意:
停止Kafka集群時(shí),一定要等kafka所有節(jié)點(diǎn)進(jìn)程全部停止后再停止Zookeeper集群。
因?yàn)閆ookeeper集群當(dāng)中記錄著kafka集群相關(guān)信息,Zookeeper集群一旦先停止,Kafka集群就沒有辦法再獲取停止進(jìn)程的信息,只能手動(dòng)殺死Kafka進(jìn)程了。
2.2 Kafka命令行操作
2.2.1 主題命令行操作
1、查看操作主題命令需要的參數(shù)
2、重要的參數(shù)如下
參數(shù) | 描述 |
---|---|
–bootstrap-server | 連接kafka Broker主機(jī)名稱和端口號(hào) |
–topic | 操作的topic名稱 |
–create | 創(chuàng)建主題 |
–delete | 刪除主題 |
–alter | 修改主題 |
–list | 查看所有主題 |
–describe | 查看主題詳細(xì)描述 |
–partitions | 設(shè)置主題分區(qū)數(shù) |
–replication-factor | 設(shè)置主題分區(qū)副本 |
–config | 更新系統(tǒng)默認(rèn)的配置 |
3、查看當(dāng)前服務(wù)器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
4、創(chuàng)建一個(gè)主題名稱為first的topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic first
5、查看topic的詳情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104
6、修改分區(qū)數(shù)(注意:分區(qū)數(shù)只能增加,不能減少)
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
7、再次查看Topic的詳情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104
Topic: first Partition: 1 Leader: 103 Replicas: 103,104,102 Isr: 103,104,102
Topic: first Partition: 2 Leader: 104 Replicas: 104,102,103 Isr: 104,102,103
8、刪除topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
2.2.2 生產(chǎn)者命令行操作
1、查看命令行生產(chǎn)者的參數(shù)
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
2、重要的參數(shù)如下:
參數(shù) | 描述 |
---|---|
–bootstrap-server | 連接kafka Broker主機(jī)名稱和端口號(hào) |
–topic | 操作的topic名稱 |
3、生產(chǎn)消息 |
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>atguigu atguigu
2.2.3 消費(fèi)者命令行操作
1、查看命令行消費(fèi)者的參數(shù)
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
2、重要的參數(shù)如下:
參數(shù) | 描述 |
---|---|
–bootstrap-server | 連接kafka Broker主機(jī)名稱和端口號(hào) |
–topic | 操作的topic名稱 |
–from-beginning | 從頭開始消費(fèi) |
–group | 指定消費(fèi)者組名稱 |
3、消費(fèi)消息 |
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
4、從頭開始消費(fèi)
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
思考:再次查看當(dāng)前kafka中的topic列表,發(fā)現(xiàn)了什么?為什么?
第 3 章:Kafka生產(chǎn)者
3.1 生產(chǎn)者消息發(fā)送流程
3.1.1 發(fā)送原理
Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式。
在消息發(fā)送的過程中,涉及到了兩個(gè)線程:main線程和Sender線程,以及一個(gè)線程共享變量:RecordAccumulator。
1、main線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator,將消息發(fā)送給RecordAccumulator。
2、Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker。
batch.size:只有數(shù)據(jù)積累到batch size之后,sender才會(huì)發(fā)送數(shù)據(jù)。默認(rèn)16k
linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.ms設(shè)置的時(shí)間到了之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值0ms,表示沒有延遲。
0:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),不需要等數(shù)據(jù)磁盤應(yīng)答。
1:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答。
2:-l(all):生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader和SR隊(duì)列里面的所有節(jié)點(diǎn)收起數(shù)據(jù)后應(yīng)答。-l和all等價(jià)。
3.1.2 生產(chǎn)者重要參數(shù)列表
參數(shù)名稱 | 描述 |
---|---|
bootstrap.servers | 生產(chǎn)者連接集群所需的broker地址清單??梢栽O(shè)置1個(gè)或者多個(gè),中間用逗號(hào)隔開。生產(chǎn)者從給定的broker里查找到其它broker信息。 |
key.serializer、value.serializer | 指定發(fā)送消息的key和value的序列化類型。要寫全類名。(反射獲?。?/td> |
buffer.memory | RecordAccumulator緩沖區(qū)大小,默認(rèn)32m。 |
batch.size | 緩沖區(qū)一批數(shù)據(jù)最大值,默認(rèn)16k。適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加。 |
linger.ms | 如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值是0ms,表示沒有延遲。生產(chǎn)環(huán)境建議該值大小為5-100ms之間。 |
acks | 0:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。1:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader數(shù)據(jù)落盤后應(yīng)答。-1(all):生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader和isr隊(duì)列里面的所有節(jié)點(diǎn)數(shù)據(jù)都落盤后應(yīng)答。默認(rèn)值是-1 |
max.in.flight.requests.per.connection | 允許最多沒有返回ack的次數(shù),默認(rèn)為5,開啟冪等性要保證該值是1-5的數(shù)字。 |
Retries(重試) | 當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試次數(shù)。默認(rèn)是int最大值,2147483647。如果設(shè)置了重試,還想保證消息的有序性,需要設(shè)置MAX_IN_flight_requests_per_connection=1否則在重試此失敗消息的時(shí)候,其它的消息可能發(fā)送成功了。 |
retry.backoff.ms | 兩次重試之間的時(shí)間間隔,默認(rèn)是100ms。 |
enable.idempotence | 是否開啟冪等性,默認(rèn)true,開啟冪等性。 |
compression.type | 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是none,不壓縮。支持壓縮類型:none、gzip、snappy、lz4和zstd。 |
3.2 異步發(fā)送API
3.2.1 普通異步發(fā)送
1、需求:創(chuàng)建Kafka生產(chǎn)者,采用異步的方式發(fā)送到Kafka broker
2、異步發(fā)送流程如下:
3、代碼編寫
1)創(chuàng)建工程kafka-demo
2)導(dǎo)入依賴
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
3)創(chuàng)建包名:com.atguigu.kafka.producer
4)編寫代碼:不帶回調(diào)函數(shù)的API
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) throws InterruptedException {
// 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2. 給kafka配置對(duì)象添加配置信息
properties.put("bootstrap.servers","hadoop102:9092");
// key,value序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 調(diào)用send方法,發(fā)送消息
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
}
// 5. 關(guān)閉資源
kafkaProducer.close();
}
}
5)測(cè)試:
在hadoop102上開啟kafka消費(fèi)者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
在IDEA中執(zhí)行上述代碼,觀察hadoop102消費(fèi)者輸出
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
kafka3
……
3.2.2 帶回調(diào)函數(shù)的異步發(fā)送
1、回調(diào)函數(shù)callback()會(huì)在producer受到ack時(shí)調(diào)用,為異步屌用。
該方法有兩個(gè)參數(shù)分別是RecordMetadata(元數(shù)據(jù)信息)和Exception(異常信息)。
1)如果Exception為null,說(shuō)明消息發(fā)送成功。
2)如果Exception不為null,說(shuō)明消息發(fā)送不成功。
2、帶回掉函數(shù)的異步調(diào)用發(fā)送流程
注意:消息發(fā)送失敗會(huì)自動(dòng)重試,不需要我們?cè)诨卣{(diào)函數(shù)中手動(dòng)重試。
3、編寫代碼:帶回調(diào)函數(shù)的生產(chǎn)者
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2. 給kafka配置對(duì)象添加配置信息
properties.put("bootstrap.servers", "hadoop102:9092");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key,value序列化(必須)
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 調(diào)用send方法,發(fā)送消息
for (int i = 0; i < 10; i++) {
// 添加回調(diào)
kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i), new Callback() {
// 該方法在Producer收到ack時(shí)調(diào)用,為異步調(diào)用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null)
// 沒有異常,輸出信息到控制臺(tái)
System.out.println("主題"+recordMetadata.topic() +", 分區(qū):"+recordMetadata.partition()+", 偏移量:"+recordMetadata.offset());
}
});
}
// 5. 關(guān)閉資源
kafkaProducer.close();
}
}
4、測(cè)試
1)在hadoop102上開啟kafka消費(fèi)者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
2)在IDEA中執(zhí)行代碼,觀察hadoop102消費(fèi)者輸出
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……
3)在IDEA控制臺(tái)觀察回調(diào)函數(shù)
主題first, 分區(qū):0, 偏移量:10
主題first, 分區(qū):0, 偏移量:11
主題first, 分區(qū):0, 偏移量:12
主題first, 分區(qū):0, 偏移量:13
主題first, 分區(qū):0, 偏移量:14
主題first, 分區(qū):0, 偏移量:15
主題first, 分區(qū):0, 偏移量:16
主題first, 分區(qū):0, 偏移量:17
主題first, 分區(qū):0, 偏移量:18
主題first, 分區(qū):0, 偏移量:19
……
3.3 同步發(fā)送API
1、同步發(fā)送的意思就是,一條消息發(fā)送之后,會(huì)阻塞當(dāng)前線程,直至返回ack。
由于send方法返回的是一個(gè)Future對(duì)象,根據(jù)Future對(duì)象的特點(diǎn),我們也可以實(shí)現(xiàn)同步發(fā)送的效果,只需要調(diào)用Future對(duì)象的get方法即可。
2、同步發(fā)送流程示意圖如下:
3、編寫代碼:同步發(fā)送消息的生產(chǎn)者
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ConsumerProducerSync {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2. 給kafka配置對(duì)象添加配置信息
//properties.put("bootstrap.servers","hadoop102:9092");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// key,value序列化(必須)
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 調(diào)用send方法,發(fā)送消息
for (int i = 0; i < 10; i++) {
// 同步發(fā)送
kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
}
// 5. 關(guān)閉資源
kafkaProducer.close();
}
}
4、測(cè)試
1)在hadoop102上開啟kafka消費(fèi)者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
2)在IDEA中執(zhí)行代碼,觀察102消費(fèi)者的消費(fèi)情況
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……
3.4 生產(chǎn)者分區(qū)
3.4.1 分區(qū)的原因
1、便于合理使用存儲(chǔ)資源,每個(gè)Partition在一個(gè)Broker上存儲(chǔ),可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊的數(shù)據(jù)存儲(chǔ)在多臺(tái)Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。
2、提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。
3.4.2 生產(chǎn)者分區(qū)策略
1、默認(rèn)分區(qū)器DefaultPartitioner
The default partitioning strategy:
·If a partition is specified in the record, use it
·If no partition is specified but a key is present choose a partition based on a hash of the key
·If no partition or key is present choose the sticky partition that changes when the batch is full.
public class DefaultPartitioner implements Partitioner {
… …
}
2、使用:
1)我們需要將producer發(fā)送的數(shù)據(jù)封裝成一個(gè)ProducerRecord對(duì)象。
2)上述的分區(qū)策略,我們?cè)赑roducerRecord對(duì)象中進(jìn)行配置。
3)策略實(shí)現(xiàn)
代碼 | 解釋 |
---|---|
ProducerRecord(topic,partition_num,…) | 指明partition的情況下直接發(fā)往指定的分區(qū),key的分配方式將無(wú)效 |
ProducerRecord(topic,key,value) | 沒有指明partition值但有key的情況下:將key的hash值與topic的partition個(gè)數(shù)進(jìn)行取余得到分區(qū)號(hào) |
ProducerRecord(topic,value) | 既沒有partition值又沒有key值得情況下:kafka采用Sticky Partition(黏性分區(qū)器),會(huì)隨機(jī)選擇一個(gè)分區(qū),并盡可能一直使用該分區(qū),待該分區(qū)的batch已滿或者已完成,kafka再隨機(jī)一個(gè)分區(qū)(絕對(duì)不會(huì)是上一個(gè))進(jìn)行使用。 |
3、案例: | |
1)案例1:將數(shù)據(jù)發(fā)送到指定partition的情況下,如:將所有消息發(fā)送到分區(qū)1中。 |
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackPartitions {
public static void main(String[] args) {
// 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2. 給kafka配置對(duì)象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// key,value序列化(必須):
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. 創(chuàng)建生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 4. 造數(shù)據(jù)
for (int i = 0; i < 5; i++) {
// 指定數(shù)據(jù)發(fā)送到1號(hào)分區(qū),key為空(IDEA中ctrl + p查看參數(shù))
kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("主題:" + metadata.topic() + "->" + "分區(qū):" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
2)測(cè)試:
(1)在hadoop102上開啟kafka消費(fèi)者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
(2)在IDEA中執(zhí)行代碼,觀察hadoop102上的消費(fèi)者消費(fèi)情況
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu1
kafka2
……
(3)觀察IDEA中控制臺(tái)輸出
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
3)案例2:沒有指明partition但是有key的情況下的消費(fèi)者分區(qū)分配
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallbackKey {
public static void main(String[] args) {
// 1. 創(chuàng)建配置對(duì)象
Properties properties = new Properties();
// 2. 配置屬性
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 3. 創(chuàng)建生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 4. 造數(shù)據(jù)
for (int i = 1; i < 11; i++) {
// 創(chuàng)建producerRecord對(duì)象
final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
"first",
i + "",// 依次指定key值為i
"atguigu " + i);
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println("消息:"+producerRecord.value()+", 主題:" + metadata.topic() + "->" + "分區(qū):" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
4)測(cè)試
觀察IDEA中控制臺(tái)輸出
消息:atguigu 1, 主題:first->分區(qū):0
消息:atguigu 5, 主題:first->分區(qū):0
消息:atguigu 7, 主題:first->分區(qū):0
消息:atguigu 8, 主題:first->分區(qū):0
消息:atguigu 2, 主題:first->分區(qū):2
消息:atguigu 3, 主題:first->分區(qū):2
消息:atguigu 9, 主題:first->分區(qū):2
消息:atguigu 4, 主題:first->分區(qū):1
消息:atguigu 6, 主題:first->分區(qū):1
消息:atguigu 10, 主題:first->分區(qū):1
3.4.3 自定義分區(qū)器
1、生產(chǎn)環(huán)境中,我們往往需要更加自由的分區(qū)需求,我們可以自定義分區(qū)器。
2、需求:在上面的根據(jù)key分區(qū)案例中,我們發(fā)現(xiàn)與我們知道的hash分區(qū)結(jié)果不同。那么我們就實(shí)現(xiàn)一個(gè)。
3、實(shí)現(xiàn)步驟:
1)定義類,實(shí)現(xiàn)Partitioner接口
2)重寫partition()方法
4、代碼實(shí)現(xiàn)
package com.atguigu.kafka.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @author leon
* @create 2020-12-11 10:43
* 1. 實(shí)現(xiàn)接口Partitioner
* 2. 實(shí)現(xiàn)3個(gè)方法:partition,close,configure
* 3. 編寫partition方法,返回分區(qū)號(hào)
*/
public class MyPartitioner implements Partitioner {
/**
* 分區(qū)方法
**/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 1. 獲取key
String keyStr = key.toString();
// 2. 創(chuàng)建分區(qū)號(hào),返回的結(jié)果
int partNum;
// 3. 計(jì)算key的hash值
int keyStrHash = keyStr.hashCode();
// 4. 獲取topic的分區(qū)個(gè)數(shù)
int partitionNumber = cluster.partitionCountForTopic(topic);
// 5. 計(jì)算分區(qū)號(hào)
partNum = Math.abs(keyStrHash) % partitionNumber;
// 4. 返回分區(qū)號(hào)
return partNum;
}
// 關(guān)閉資源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map<String, ?> configs) {
}
}
5、測(cè)試
在生產(chǎn)者代碼中,通過配置對(duì)象,添加自定義分區(qū)器
// 添加自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," com.atguigu.kafka.partitioner.MyPartitioner ");
在hadoop102上啟動(dòng)kafka消費(fèi)者
[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
在IDEA中觀察回調(diào)信息
消息:atguigu 2, 主題:first->分區(qū):2
消息:atguigu 5, 主題:first->分區(qū):2
消息:atguigu 8, 主題:first->分區(qū):2
消息:atguigu 1, 主題:first->分區(qū):1
消息:atguigu 4, 主題:first->分區(qū):1
消息:atguigu 7, 主題:first->分區(qū):1
消息:atguigu 10, 主題:first->分區(qū):1
消息:atguigu 3, 主題:first->分區(qū):0
消息:atguigu 6, 主題:first->分區(qū):0
消息:atguigu 9, 主題:first->分區(qū):0
3.5 生產(chǎn)經(jīng)驗(yàn)-生產(chǎn)者如何提高吞吐量
3.5.1 吞吐量
3.5.2 實(shí)例
1、編寫代碼
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerParameters {
public static void main(String[] args) throws InterruptedException {
// 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2. 給kafka配置對(duì)象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key,value序列化(必須):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// batch.size:批次大小,默認(rèn)16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger.ms:等待時(shí)間,默認(rèn)0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// RecordAccumulator:緩沖區(qū)大小,默認(rèn)32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// compression.type:壓縮,默認(rèn)none,可配置值gzip、snappy、lz4和zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 調(diào)用send方法,發(fā)送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu" + i));
}
// 5. 關(guān)閉資源
kafkaProducer.close();
}
}
2、測(cè)試:
1)在hadoop102上開啟kafka消費(fèi)者
[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
2)在IDEA中執(zhí)行代碼,觀察hadoop102上的消費(fèi)者消費(fèi)情況
[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu0
atguigu0
……
3.6 生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)可靠性
1、回顧消費(fèi)發(fā)送流程
2、ack應(yīng)答機(jī)制
3、ack應(yīng)答級(jí)別
背景:leader收到數(shù)據(jù),所有follower都開始同步數(shù)據(jù),但有一個(gè)follower,因?yàn)槟撤N故障,遲遲不能與leader進(jìn)行同步,那leader就要一直等下去,直到它完成同步,才能發(fā)送ack。這個(gè)問題怎么解決呢?
Kafka提供的解決方案:ISR隊(duì)列
1)Leader維護(hù)了一個(gè)動(dòng)態(tài)的in-sync replica set(ISR)和leader保持同步的follower集合。
2)當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后,leader就會(huì)給producer發(fā)送ack。
3)如果follower長(zhǎng)時(shí)間(replica.lag.time.max.ms)未向leader同步數(shù)據(jù),則該follower將被提出ISR。
Leader發(fā)生故障之后,就會(huì)從ISR中選舉新的leader。
ack應(yīng)答級(jí)別
對(duì)于某些不太重要的數(shù)據(jù),對(duì)數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等ISR中的follower全部接收成功。
所以Kafka為用戶提供了三種可靠性級(jí)別,用戶根據(jù)對(duì)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。
acks=0 | 這一操作提供了一個(gè)最低的延遲,partition的leader副本接收到消息還沒有寫入磁盤就已經(jīng)返回ack,當(dāng)leader故障時(shí)有可能丟失數(shù)據(jù) |
---|---|
acks=1 | partition的leader副本落盤后返回ack,如果在follower副本同步數(shù)據(jù)之前l(fā)eader故障,那么將對(duì)丟失數(shù)據(jù) |
acks=-1 | partition的leader和follower副本全部落盤成功后才返回ack。但是如果在follower副本同步完成后,leader副本所在節(jié)點(diǎn)發(fā)送ack之前,leader副本發(fā)送故障,那么會(huì)造成數(shù)據(jù)重復(fù) |
4、ack應(yīng)答機(jī)制
5、案例
代碼編寫:
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {
public static void main(String[] args) throws InterruptedException {
// 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2. 給kafka配置對(duì)象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key,value序列化(必須):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 設(shè)置acks
properties.put(ProducerConfig.ACKS_CONFIG, "all");
// 重試次數(shù)retries,默認(rèn)是int最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 4. 調(diào)用send方法,發(fā)送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
}
// 5. 關(guān)閉資源
kafkaProducer.close();
}
}?
3.7 生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)去重
3.7.1 數(shù)據(jù)傳遞語(yǔ)義
至少一次(At Least Once)=ACK級(jí)別設(shè)置為-1+分區(qū)副本大于等于2+ISR里應(yīng)答的最小副本數(shù)量大于等于2
最多一次(At More Once)=ACK級(jí)別設(shè)置為0
總結(jié):
1)At Least Once可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)
2)At More Once可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失
精確一次(Exactly Once):對(duì)于一些非常重要的信息,比如和錢相關(guān)的數(shù)據(jù),要求數(shù)據(jù)既不能重復(fù)也不丟失。
3.7.2 冪等性
1、冪等性原理:
冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會(huì)持久化一條,保證了不重復(fù)。
精確一次(Exactly Once)=冪等性+至少一次(ack=-1+分區(qū)副本數(shù)>=2+ISR最小副本數(shù)量>=2)。
重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有<PID,Partition,SeqNumber>相同主鍵的消息提交時(shí),Broker只會(huì)持久化一條。其中PID是producer每次重啟都會(huì)分配一個(gè)新的:Partition表示分區(qū)號(hào);SequenceNumber是單調(diào)自增的。
所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù)。
2、開啟冪等性
在producer的配置對(duì)象中,添加參數(shù)enable.idempotence,參數(shù)值默認(rèn)為true,設(shè)置為false就關(guān)閉了。
3.7.3 生產(chǎn)者事務(wù)
1、kafka事務(wù)原理
2、事務(wù)代碼流程
// 1初始化事務(wù)
void initTransactions();
// 2開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;
3.8 生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)有序
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-806008.html
3.9生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)亂序
1、kafka在1.x版本之前保證單分區(qū)有序,條件如下:
max.in.flight.requests.per.connection=1(不需要考慮是否開啟冪等性)
2、kafka在1.x及以后版本保證數(shù)據(jù)單分區(qū)有序,條件如下:
1)未開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置為1
2)開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置小于等于5
原因說(shuō)明:因?yàn)樵趉afka1.x以后,啟用冪等后,kafka服務(wù)器會(huì)緩存producer發(fā)來(lái)的最近5個(gè)request的元數(shù)據(jù),故無(wú)論如何,都可以保證最近5個(gè)request的數(shù)據(jù)都是有序的。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-806008.html
到了這里,關(guān)于大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!