国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)

這篇具有很好參考價(jià)值的文章主要介紹了大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

第 1 章:Kafka概述

1.1 定義

Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
發(fā)布/訂閱:消息的發(fā)布者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息分為不同的類別,訂閱者只接收感興趣的消息。

1.2 消息隊(duì)列

目前企業(yè)中比較常見的消息隊(duì)列產(chǎn)品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大多數(shù)場(chǎng)景主要采用Kafka作為消息隊(duì)列
在JavaEE開發(fā)中主要采用ActiveMQ、RabbitMQ、RocketMQ

1.2.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景

1、傳統(tǒng)的消費(fèi)隊(duì)列的主要應(yīng)用場(chǎng)景有:緩存/削峰(緩沖)、解耦(少依賴)、異步通信(不必要及時(shí)處理)
1)緩存/削峰(緩沖):有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

2)解耦:允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵循同樣的接口約束。
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3)異步通信:允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它,然后再需要的時(shí)候再去處理它們。

大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

1.2.2 消息隊(duì)列的兩種模式

消息隊(duì)列主要分為兩種模式:點(diǎn)對(duì)點(diǎn)模式(一個(gè)生產(chǎn)者對(duì)口一個(gè)消費(fèi)者)和發(fā)布/訂閱模式(一對(duì)多)
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

1.3 Kafka基礎(chǔ)框架

大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

1、Producer:消息生產(chǎn)者,就是向Kafka broker發(fā)消息的客戶端
2、Consumer:消息消費(fèi)者,向kafka broker獲取消息的客戶端
3、Consumer Group(CG):消費(fèi)者組,由多個(gè)consumer組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)broker可以由多個(gè)不同的topic,一個(gè)topic下的一個(gè)分區(qū)只能被一個(gè)消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者所消費(fèi);消費(fèi)者之間不受影響。消費(fèi)者組是邏輯上的一個(gè)訂閱者。
4、Broker:一個(gè)kafka服務(wù)器就是一個(gè)broker。一個(gè)broker可以容納多個(gè)不同topic
5、Topic:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè)topic
6、Partition:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的topic可以分布到多個(gè)broker(即服務(wù)器)上,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列
7、Replica:副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的partition數(shù)據(jù)不丟失,且kafka仍然能夠繼續(xù)工作,kafka提供了副本機(jī)制,一個(gè)topic的每個(gè)發(fā)你去都有若干個(gè)副本,一個(gè)leader和若干個(gè)follower
8、leader:每個(gè)分區(qū)副本中的”主“,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是leader
9、followeer:每個(gè)分區(qū)副本中的“從”,實(shí)現(xiàn)于leader副本保持同步,在leader發(fā)送故障時(shí),稱為新的leader

第 2 章:Kafka快速入門

2.1 安裝部署

2.1.1 集群部署

2.1.2 集群部署

1、官方下載地址:http://kafka.apache.org/downloads.html
2、上傳安裝包到102的/opt/software目錄下:

[atguigu@hadoop102 software]$ ll
-rw-rw-r--. 1 atguigu atguigu  86486610 3月  10 12:33 kafka_2.12-3.0.0.tgz

3、解壓安裝包到/opt/module/目錄下

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

4、進(jìn)入到/opt/module目錄下,修改解壓包名為kafka

[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0 kafka

5、修改config目錄下的配置文件server.properties內(nèi)容如下

[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
#broker的全局唯一編號(hào),不能重復(fù),只能是數(shù)字。
broker.id=102
#處理網(wǎng)絡(luò)請(qǐng)求的線程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤IO的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運(yùn)行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka自動(dòng)幫你創(chuàng)建,可以配置多個(gè)磁盤路徑,路徑與路徑之間可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在當(dāng)前broker上的分區(qū)個(gè)數(shù)
num.partitions=1
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
# 每個(gè)topic創(chuàng)建時(shí)的副本數(shù),默認(rèn)時(shí)1個(gè)副本
offsets.topic.replication.factor=1
#segment文件保留的最長(zhǎng)時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#每個(gè)segment文件的大小,默認(rèn)最大1G
log.segment.bytes=1073741824
# 檢查過期數(shù)據(jù)的時(shí)間,默認(rèn)5分鐘檢查一次是否數(shù)據(jù)過期
log.retention.check.interval.ms=300000
#配置連接Zookeeper集群地址(在zk根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

6、配置環(huán)境變量

[atguigu@hadoop102 kafka]$ sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[atguigu@hadoop102 kafka]$ source /etc/profile

7、分發(fā)環(huán)境變量文件并source

[atguigu@hadoop102 kafka]$ xsync /etc/profile.d/my_env.sh
==================== hadoop102 ====================
sending incremental file list

sent 47 bytes  received 12 bytes  39.33 bytes/sec
total size is 371  speedup is 6.29
==================== hadoop103 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.Sd7MUA" failed: Permission denied (13)

sent 465 bytes  received 126 bytes  394.00 bytes/sec

total size is 371  speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2]
==================== hadoop104 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.vb8jRj" failed: Permission denied (13)

sent 465 bytes  received 126 bytes  1,182.00 bytes/sec
total size is 371  speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2],
# 這時(shí)你覺得適用sudo就可以了,但是真的是這樣嗎?
[atguigu@hadoop102 kafka]$ sudo xsync /etc/profile.d/my_env.sh
sudo: xsync:找不到命令
# 這時(shí)需要將xsync的命令文件,copy到/usr/bin/下,sudo(root)才能找到xsync命令
[atguigu@hadoop102 kafka]$ sudo cp /home/atguigu/bin/xsync /usr/bin/
[atguigu@hadoop102 kafka]$ sudo xsync /etc/profile.d/my_env.sh
# 在每個(gè)節(jié)點(diǎn)上執(zhí)行source命令,如何你沒有xcall腳本,就手動(dòng)在三臺(tái)節(jié)點(diǎn)上執(zhí)行source命令。
[atguigu@hadoop102 kafka]$ xcall source /etc/profile

8、分發(fā)安裝包

[atguigu@hadoop102 module]$ xsync kafka/

9、修改配置文件的brokerid
分別在hadoop103和104上修改配置文件server.properties中的broker.id=103、broker.id=104
注:broker.id不得重復(fù)

[atguigu@hadoop103 kafka]$ vim config/server.properties
broker.id=103	
[atguigu@hadoop104 kafka]$ vim config/server.properties
broker.id=104

10、啟動(dòng)集群
1)先啟動(dòng)Zookeeper集群

[atguigu@hadoop102 kafka]$ zk.sh start 

2)一次在102、103、104節(jié)點(diǎn)啟動(dòng)kafka

[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties	[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties	[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

11、關(guān)閉集群

[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh	
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh	
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh

2.1.4 kafka群起腳本

1、腳本編寫
在/home/atguigu/bin目錄下創(chuàng)建文件kafka.sh腳本文件:

#! /bin/bash
if (($#==0)); then
  echo -e "請(qǐng)輸入?yún)?shù):\n start  啟動(dòng)kafka集群;\n stop  停止kafka集群;\n" && exit
fi

case $1 in
  "start")
    for host in hadoop103 hadoop102 hadoop104
      do
        echo "---------- $1 $host 的kafka ----------"
        ssh $host "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
      done
      ;;
  "stop")
    for host in hadoop103 hadoop102 hadoop104
      do
        echo "---------- $1 $host 的kafka ----------"
        ssh $host "/opt/module/kafka/bin/kafka-server-stop.sh"
      done
      ;;
    *)
        echo -e "---------- 請(qǐng)輸入正確的參數(shù) ----------\n"
        echo -e "start  啟動(dòng)kafka集群;\n stop  停止kafka集群;\n" && exit
      ;;
esac

2、腳本文件添加權(quán)限

[atguigu@hadoop102 bin]$ chmod +x kafka.sh

注意:
停止Kafka集群時(shí),一定要等kafka所有節(jié)點(diǎn)進(jìn)程全部停止后再停止Zookeeper集群。
因?yàn)閆ookeeper集群當(dāng)中記錄著kafka集群相關(guān)信息,Zookeeper集群一旦先停止,Kafka集群就沒有辦法再獲取停止進(jìn)程的信息,只能手動(dòng)殺死Kafka進(jìn)程了。

2.2 Kafka命令行操作

大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

2.2.1 主題命令行操作

1、查看操作主題命令需要的參數(shù)

2、重要的參數(shù)如下

參數(shù) 描述
–bootstrap-server 連接kafka Broker主機(jī)名稱和端口號(hào)
–topic 操作的topic名稱
–create 創(chuàng)建主題
–delete 刪除主題
–alter 修改主題
–list 查看所有主題
–describe 查看主題詳細(xì)描述
–partitions 設(shè)置主題分區(qū)數(shù)
–replication-factor 設(shè)置主題分區(qū)副本
–config 更新系統(tǒng)默認(rèn)的配置

3、查看當(dāng)前服務(wù)器中的所有topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list

4、創(chuàng)建一個(gè)主題名稱為first的topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 3 --topic first

5、查看topic的詳情

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first    TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 102     Replicas: 102,103,104   Isr: 102,103,104

6、修改分區(qū)數(shù)(注意:分區(qū)數(shù)只能增加,不能減少)

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3

7、再次查看Topic的詳情

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first    TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 102     Replicas: 102,103,104   Isr: 102,103,104
        Topic: first    Partition: 1    Leader: 103     Replicas: 103,104,102   Isr: 103,104,102
        Topic: first    Partition: 2    Leader: 104     Replicas: 104,102,103   Isr: 104,102,103

8、刪除topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first

2.2.2 生產(chǎn)者命令行操作

1、查看命令行生產(chǎn)者的參數(shù)

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh 

2、重要的參數(shù)如下:

參數(shù) 描述
–bootstrap-server 連接kafka Broker主機(jī)名稱和端口號(hào)
–topic 操作的topic名稱
3、生產(chǎn)消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>atguigu  atguigu

2.2.3 消費(fèi)者命令行操作

1、查看命令行消費(fèi)者的參數(shù)

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh 

2、重要的參數(shù)如下:

參數(shù) 描述
–bootstrap-server 連接kafka Broker主機(jī)名稱和端口號(hào)
–topic 操作的topic名稱
–from-beginning 從頭開始消費(fèi)
–group 指定消費(fèi)者組名稱
3、消費(fèi)消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

4、從頭開始消費(fèi)

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

思考:再次查看當(dāng)前kafka中的topic列表,發(fā)現(xiàn)了什么?為什么?

第 3 章:Kafka生產(chǎn)者

3.1 生產(chǎn)者消息發(fā)送流程

3.1.1 發(fā)送原理

Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式。
在消息發(fā)送的過程中,涉及到了兩個(gè)線程:main線程和Sender線程,以及一個(gè)線程共享變量:RecordAccumulator。
1、main線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator,將消息發(fā)送給RecordAccumulator。
2、Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker。
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

batch.size:只有數(shù)據(jù)積累到batch size之后,sender才會(huì)發(fā)送數(shù)據(jù)。默認(rèn)16k
linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.ms設(shè)置的時(shí)間到了之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值0ms,表示沒有延遲。
0:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),不需要等數(shù)據(jù)磁盤應(yīng)答。
1:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader收到數(shù)據(jù)后應(yīng)答。
2:-l(all):生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader和SR隊(duì)列里面的所有節(jié)點(diǎn)收起數(shù)據(jù)后應(yīng)答。-l和all等價(jià)。

3.1.2 生產(chǎn)者重要參數(shù)列表

參數(shù)名稱 描述
bootstrap.servers 生產(chǎn)者連接集群所需的broker地址清單??梢栽O(shè)置1個(gè)或者多個(gè),中間用逗號(hào)隔開。生產(chǎn)者從給定的broker里查找到其它broker信息。
key.serializer、value.serializer 指定發(fā)送消息的key和value的序列化類型。要寫全類名。(反射獲?。?/td>
buffer.memory RecordAccumulator緩沖區(qū)大小,默認(rèn)32m。
batch.size 緩沖區(qū)一批數(shù)據(jù)最大值,默認(rèn)16k。適當(dāng)增加該值,可以提高吞吐量,但是如果該值設(shè)置太大,會(huì)導(dǎo)致數(shù)據(jù)傳輸延遲增加。
linger.ms 如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.time之后就會(huì)發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值是0ms,表示沒有延遲。生產(chǎn)環(huán)境建議該值大小為5-100ms之間。
acks 0:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),不需要等數(shù)據(jù)落盤應(yīng)答。1:生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader數(shù)據(jù)落盤后應(yīng)答。-1(all):生產(chǎn)者發(fā)送過來(lái)的數(shù)據(jù),Leader和isr隊(duì)列里面的所有節(jié)點(diǎn)數(shù)據(jù)都落盤后應(yīng)答。默認(rèn)值是-1
max.in.flight.requests.per.connection 允許最多沒有返回ack的次數(shù),默認(rèn)為5,開啟冪等性要保證該值是1-5的數(shù)字。
Retries(重試) 當(dāng)消息發(fā)送出現(xiàn)錯(cuò)誤的時(shí)候,系統(tǒng)會(huì)重發(fā)消息。retries表示重試次數(shù)。默認(rèn)是int最大值,2147483647。如果設(shè)置了重試,還想保證消息的有序性,需要設(shè)置MAX_IN_flight_requests_per_connection=1否則在重試此失敗消息的時(shí)候,其它的消息可能發(fā)送成功了。
retry.backoff.ms 兩次重試之間的時(shí)間間隔,默認(rèn)是100ms。
enable.idempotence 是否開啟冪等性,默認(rèn)true,開啟冪等性。
compression.type 生產(chǎn)者發(fā)送的所有數(shù)據(jù)的壓縮方式。默認(rèn)是none,不壓縮。支持壓縮類型:none、gzip、snappy、lz4和zstd。

3.2 異步發(fā)送API

3.2.1 普通異步發(fā)送

1、需求:創(chuàng)建Kafka生產(chǎn)者,采用異步的方式發(fā)送到Kafka broker
2、異步發(fā)送流程如下:
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3、代碼編寫
1)創(chuàng)建工程kafka-demo
2)導(dǎo)入依賴

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
</dependencies>

3)創(chuàng)建包名:com.atguigu.kafka.producer
4)編寫代碼:不帶回調(diào)函數(shù)的API

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) throws InterruptedException {
        // 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2. 給kafka配置對(duì)象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");

        // key,value序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 調(diào)用send方法,發(fā)送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
        }

        // 5. 關(guān)閉資源
        kafkaProducer.close();
    }
} 

5)測(cè)試:
在hadoop102上開啟kafka消費(fèi)者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

在IDEA中執(zhí)行上述代碼,觀察hadoop102消費(fèi)者輸出

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
kafka3
……

3.2.2 帶回調(diào)函數(shù)的異步發(fā)送

1、回調(diào)函數(shù)callback()會(huì)在producer受到ack時(shí)調(diào)用,為異步屌用。
該方法有兩個(gè)參數(shù)分別是RecordMetadata(元數(shù)據(jù)信息)和Exception(異常信息)。
1)如果Exception為null,說(shuō)明消息發(fā)送成功。
2)如果Exception不為null,說(shuō)明消息發(fā)送不成功。
2、帶回掉函數(shù)的異步調(diào)用發(fā)送流程
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

注意:消息發(fā)送失敗會(huì)自動(dòng)重試,不需要我們?cè)诨卣{(diào)函數(shù)中手動(dòng)重試。
3、編寫代碼:帶回調(diào)函數(shù)的生產(chǎn)者

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        // 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2. 給kafka配置對(duì)象添加配置信息
        properties.put("bootstrap.servers", "hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必須)
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 調(diào)用send方法,發(fā)送消息
        for (int i = 0; i < 10; i++) {
            // 添加回調(diào)
            kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i), new Callback() {
                // 該方法在Producer收到ack時(shí)調(diào)用,為異步調(diào)用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) 
                        // 沒有異常,輸出信息到控制臺(tái)
                        System.out.println("主題"+recordMetadata.topic() +", 分區(qū):"+recordMetadata.partition()+", 偏移量:"+recordMetadata.offset());
                }
            });
        }

        // 5. 關(guān)閉資源
        kafkaProducer.close();
    }
}

4、測(cè)試
1)在hadoop102上開啟kafka消費(fèi)者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2)在IDEA中執(zhí)行代碼,觀察hadoop102消費(fèi)者輸出

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……

3)在IDEA控制臺(tái)觀察回調(diào)函數(shù)

主題first, 分區(qū):0, 偏移量:10
主題first, 分區(qū):0, 偏移量:11
主題first, 分區(qū):0, 偏移量:12
主題first, 分區(qū):0, 偏移量:13
主題first, 分區(qū):0, 偏移量:14
主題first, 分區(qū):0, 偏移量:15
主題first, 分區(qū):0, 偏移量:16
主題first, 分區(qū):0, 偏移量:17
主題first, 分區(qū):0, 偏移量:18
主題first, 分區(qū):0, 偏移量:19
……

3.3 同步發(fā)送API

1、同步發(fā)送的意思就是,一條消息發(fā)送之后,會(huì)阻塞當(dāng)前線程,直至返回ack。
由于send方法返回的是一個(gè)Future對(duì)象,根據(jù)Future對(duì)象的特點(diǎn),我們也可以實(shí)現(xiàn)同步發(fā)送的效果,只需要調(diào)用Future對(duì)象的get方法即可。
2、同步發(fā)送流程示意圖如下:
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3、編寫代碼:同步發(fā)送消息的生產(chǎn)者

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ConsumerProducerSync {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2. 給kafka配置對(duì)象添加配置信息
        //properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        // key,value序列化(必須)
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 調(diào)用send方法,發(fā)送消息
        for (int i = 0; i < 10; i++) {

            // 同步發(fā)送
            kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
        }

        // 5. 關(guān)閉資源
        kafkaProducer.close();
    }
}

4、測(cè)試
1)在hadoop102上開啟kafka消費(fèi)者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2)在IDEA中執(zhí)行代碼,觀察102消費(fèi)者的消費(fèi)情況

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……

3.4 生產(chǎn)者分區(qū)

3.4.1 分區(qū)的原因

1、便于合理使用存儲(chǔ)資源,每個(gè)Partition在一個(gè)Broker上存儲(chǔ),可以把海量的數(shù)據(jù)按照分區(qū)切割成一塊一塊的數(shù)據(jù)存儲(chǔ)在多臺(tái)Broker上。合理控制分區(qū)的任務(wù),可以實(shí)現(xiàn)負(fù)載均衡的效果。
2、提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù);消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)數(shù)據(jù)。
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3.4.2 生產(chǎn)者分區(qū)策略

1、默認(rèn)分區(qū)器DefaultPartitioner

The default partitioning strategy:
·If a partition is specified in the record, use it
·If no partition is specified but a key is present choose a partition based on a hash of the key
·If no partition or key is present choose the sticky partition that changes when the batch is full.
public class DefaultPartitioner implements Partitioner {
… …
}

2、使用:
1)我們需要將producer發(fā)送的數(shù)據(jù)封裝成一個(gè)ProducerRecord對(duì)象。
2)上述的分區(qū)策略,我們?cè)赑roducerRecord對(duì)象中進(jìn)行配置。
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3)策略實(shí)現(xiàn)

代碼 解釋
ProducerRecord(topic,partition_num,…) 指明partition的情況下直接發(fā)往指定的分區(qū),key的分配方式將無(wú)效
ProducerRecord(topic,key,value) 沒有指明partition值但有key的情況下:將key的hash值與topic的partition個(gè)數(shù)進(jìn)行取余得到分區(qū)號(hào)
ProducerRecord(topic,value) 既沒有partition值又沒有key值得情況下:kafka采用Sticky Partition(黏性分區(qū)器),會(huì)隨機(jī)選擇一個(gè)分區(qū),并盡可能一直使用該分區(qū),待該分區(qū)的batch已滿或者已完成,kafka再隨機(jī)一個(gè)分區(qū)(絕對(duì)不會(huì)是上一個(gè))進(jìn)行使用。
3、案例:
1)案例1:將數(shù)據(jù)發(fā)送到指定partition的情況下,如:將所有消息發(fā)送到分區(qū)1中。
package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {

        // 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2. 給kafka配置對(duì)象添加配置信息
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        // key,value序列化(必須):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 創(chuàng)建生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 造數(shù)據(jù)
        for (int i = 0; i < 5; i++) {
            // 指定數(shù)據(jù)發(fā)送到1號(hào)分區(qū),key為空(IDEA中ctrl + p查看參數(shù))
            kafkaProducer.send(new ProducerRecord<>("first", 1,"","atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null){
                        System.out.println("主題:" + metadata.topic() + "->"  + "分區(qū):" + metadata.partition()
                        );
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }

        kafkaProducer.close();
    }
}

2)測(cè)試:
(1)在hadoop102上開啟kafka消費(fèi)者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

(2)在IDEA中執(zhí)行代碼,觀察hadoop102上的消費(fèi)者消費(fèi)情況

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu1
kafka2
……

(3)觀察IDEA中控制臺(tái)輸出

主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1
主題:first->分區(qū):1

3)案例2:沒有指明partition但是有key的情況下的消費(fèi)者分區(qū)分配

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerCallbackKey {
    public static void main(String[] args) {
        // 1. 創(chuàng)建配置對(duì)象
        Properties properties = new Properties();
        // 2. 配置屬性
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 3. 創(chuàng)建生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        // 4. 造數(shù)據(jù)
        for (int i = 1; i < 11; i++) {
            // 創(chuàng)建producerRecord對(duì)象
            final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
                        "first", 
                        i + "",// 依次指定key值為i
                        "atguigu " + i);
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null){
                        System.out.println("消息:"+producerRecord.value()+", 主題:" + metadata.topic() + "->" + "分區(qū):" + metadata.partition()
                        );
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }

        kafkaProducer.close();
    }
}

4)測(cè)試
觀察IDEA中控制臺(tái)輸出

消息:atguigu 1, 主題:first->分區(qū):0
消息:atguigu 5, 主題:first->分區(qū):0
消息:atguigu 7, 主題:first->分區(qū):0
消息:atguigu 8, 主題:first->分區(qū):0
消息:atguigu 2, 主題:first->分區(qū):2
消息:atguigu 3, 主題:first->分區(qū):2
消息:atguigu 9, 主題:first->分區(qū):2
消息:atguigu 4, 主題:first->分區(qū):1
消息:atguigu 6, 主題:first->分區(qū):1
消息:atguigu 10, 主題:first->分區(qū):1

3.4.3 自定義分區(qū)器

1、生產(chǎn)環(huán)境中,我們往往需要更加自由的分區(qū)需求,我們可以自定義分區(qū)器。
2、需求:在上面的根據(jù)key分區(qū)案例中,我們發(fā)現(xiàn)與我們知道的hash分區(qū)結(jié)果不同。那么我們就實(shí)現(xiàn)一個(gè)。
3、實(shí)現(xiàn)步驟:
1)定義類,實(shí)現(xiàn)Partitioner接口
2)重寫partition()方法
4、代碼實(shí)現(xiàn)

package com.atguigu.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

/**
 * @author leon
 * @create 2020-12-11 10:43
 * 1. 實(shí)現(xiàn)接口Partitioner
 * 2. 實(shí)現(xiàn)3個(gè)方法:partition,close,configure
 * 3. 編寫partition方法,返回分區(qū)號(hào)
 */
public class MyPartitioner implements Partitioner {
    /**
    *  分區(qū)方法
    **/
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       // 1. 獲取key
        String keyStr = key.toString();
        // 2. 創(chuàng)建分區(qū)號(hào),返回的結(jié)果
        int partNum;
        // 3. 計(jì)算key的hash值
        int keyStrHash = keyStr.hashCode();
        // 4. 獲取topic的分區(qū)個(gè)數(shù)
        int partitionNumber = cluster.partitionCountForTopic(topic);
        // 5. 計(jì)算分區(qū)號(hào)
        partNum = Math.abs(keyStrHash) % partitionNumber;
        // 4. 返回分區(qū)號(hào)
        return partNum;
    }

    // 關(guān)閉資源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

5、測(cè)試
在生產(chǎn)者代碼中,通過配置對(duì)象,添加自定義分區(qū)器

// 添加自定義分區(qū)器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," com.atguigu.kafka.partitioner.MyPartitioner ");

在hadoop102上啟動(dòng)kafka消費(fèi)者

[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

在IDEA中觀察回調(diào)信息

消息:atguigu 2, 主題:first->分區(qū):2
消息:atguigu 5, 主題:first->分區(qū):2
消息:atguigu 8, 主題:first->分區(qū):2
消息:atguigu 1, 主題:first->分區(qū):1
消息:atguigu 4, 主題:first->分區(qū):1
消息:atguigu 7, 主題:first->分區(qū):1
消息:atguigu 10, 主題:first->分區(qū):1
消息:atguigu 3, 主題:first->分區(qū):0
消息:atguigu 6, 主題:first->分區(qū):0
消息:atguigu 9, 主題:first->分區(qū):0

3.5 生產(chǎn)經(jīng)驗(yàn)-生產(chǎn)者如何提高吞吐量

3.5.1 吞吐量

大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3.5.2 實(shí)例

1、編寫代碼

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducerParameters {

    public static void main(String[] args) throws InterruptedException {

        // 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2. 給kafka配置對(duì)象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        
        // key,value序列化(必須):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // batch.size:批次大小,默認(rèn)16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // linger.ms:等待時(shí)間,默認(rèn)0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // RecordAccumulator:緩沖區(qū)大小,默認(rèn)32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        // compression.type:壓縮,默認(rèn)none,可配置值gzip、snappy、lz4和zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

        // 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 調(diào)用send方法,發(fā)送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu" + i));
        }

        // 5. 關(guān)閉資源
        kafkaProducer.close();
    }
}

2、測(cè)試:
1)在hadoop102上開啟kafka消費(fèi)者

[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2)在IDEA中執(zhí)行代碼,觀察hadoop102上的消費(fèi)者消費(fèi)情況

[atguigu@hadoop102 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu0
atguigu0
atguigu0
……

3.6 生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)可靠性

1、回顧消費(fèi)發(fā)送流程
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

2、ack應(yīng)答機(jī)制
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3、ack應(yīng)答級(jí)別
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

背景:leader收到數(shù)據(jù),所有follower都開始同步數(shù)據(jù),但有一個(gè)follower,因?yàn)槟撤N故障,遲遲不能與leader進(jìn)行同步,那leader就要一直等下去,直到它完成同步,才能發(fā)送ack。這個(gè)問題怎么解決呢?
Kafka提供的解決方案:ISR隊(duì)列
1)Leader維護(hù)了一個(gè)動(dòng)態(tài)的in-sync replica set(ISR)和leader保持同步的follower集合。
2)當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后,leader就會(huì)給producer發(fā)送ack。
3)如果follower長(zhǎng)時(shí)間(replica.lag.time.max.ms)未向leader同步數(shù)據(jù),則該follower將被提出ISR。
Leader發(fā)生故障之后,就會(huì)從ISR中選舉新的leader。
ack應(yīng)答級(jí)別
對(duì)于某些不太重要的數(shù)據(jù),對(duì)數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等ISR中的follower全部接收成功。
所以Kafka為用戶提供了三種可靠性級(jí)別,用戶根據(jù)對(duì)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。

acks=0 這一操作提供了一個(gè)最低的延遲,partition的leader副本接收到消息還沒有寫入磁盤就已經(jīng)返回ack,當(dāng)leader故障時(shí)有可能丟失數(shù)據(jù)
acks=1 partition的leader副本落盤后返回ack,如果在follower副本同步數(shù)據(jù)之前l(fā)eader故障,那么將對(duì)丟失數(shù)據(jù)
acks=-1 partition的leader和follower副本全部落盤成功后才返回ack。但是如果在follower副本同步完成后,leader副本所在節(jié)點(diǎn)發(fā)送ack之前,leader副本發(fā)送故障,那么會(huì)造成數(shù)據(jù)重復(fù)

4、ack應(yīng)答機(jī)制
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

5、案例
代碼編寫:

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class CustomProducerAck {
    public static void main(String[] args) throws InterruptedException {

        // 1. 創(chuàng)建kafka生產(chǎn)者的配置對(duì)象
        Properties properties = new Properties();

        // 2. 給kafka配置對(duì)象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        // key,value序列化(必須):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 設(shè)置acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        // 重試次數(shù)retries,默認(rèn)是int最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        // 3. 創(chuàng)建kafka生產(chǎn)者對(duì)象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 調(diào)用send方法,發(fā)送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
        }

        // 5. 關(guān)閉資源
        kafkaProducer.close();
    }
}?

3.7 生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)去重

3.7.1 數(shù)據(jù)傳遞語(yǔ)義

至少一次(At Least Once)=ACK級(jí)別設(shè)置為-1+分區(qū)副本大于等于2+ISR里應(yīng)答的最小副本數(shù)量大于等于2
最多一次(At More Once)=ACK級(jí)別設(shè)置為0
總結(jié):
1)At Least Once可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)
2)At More Once可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失
精確一次(Exactly Once):對(duì)于一些非常重要的信息,比如和錢相關(guān)的數(shù)據(jù),要求數(shù)據(jù)既不能重復(fù)也不丟失。

3.7.2 冪等性

1、冪等性原理:
冪等性就是指Producer不論向Broker發(fā)送多少次重復(fù)數(shù)據(jù),Broker端都只會(huì)持久化一條,保證了不重復(fù)。
精確一次(Exactly Once)=冪等性+至少一次(ack=-1+分區(qū)副本數(shù)>=2+ISR最小副本數(shù)量>=2)。
重復(fù)數(shù)據(jù)的判斷標(biāo)準(zhǔn):具有<PID,Partition,SeqNumber>相同主鍵的消息提交時(shí),Broker只會(huì)持久化一條。其中PID是producer每次重啟都會(huì)分配一個(gè)新的:Partition表示分區(qū)號(hào);SequenceNumber是單調(diào)自增的。
所以冪等性只能保證的是在單分區(qū)單會(huì)話內(nèi)不重復(fù)。
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

2、開啟冪等性
在producer的配置對(duì)象中,添加參數(shù)enable.idempotence,參數(shù)值默認(rèn)為true,設(shè)置為false就關(guān)閉了。

3.7.3 生產(chǎn)者事務(wù)

1、kafka事務(wù)原理
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

2、事務(wù)代碼流程

// 1初始化事務(wù)
void initTransactions();
// 2開啟事務(wù)
void beginTransaction() throws ProducerFencedException;
// 3在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;
// 4提交事務(wù)
void commitTransaction() throws ProducerFencedException;
// 5放棄事務(wù)(類似于回滾事務(wù)的操作)
void abortTransaction() throws ProducerFencedException;

3.8 生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)有序

大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式

3.9生產(chǎn)經(jīng)驗(yàn)-數(shù)據(jù)亂序

1、kafka在1.x版本之前保證單分區(qū)有序,條件如下:
max.in.flight.requests.per.connection=1(不需要考慮是否開啟冪等性)
2、kafka在1.x及以后版本保證數(shù)據(jù)單分區(qū)有序,條件如下:
1)未開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置為1
2)開啟冪等性
max.in.flight.requests.per.connection需要設(shè)置小于等于5
原因說(shuō)明:因?yàn)樵趉afka1.x以后,啟用冪等后,kafka服務(wù)器會(huì)緩存producer發(fā)來(lái)的最近5個(gè)request的元數(shù)據(jù),故無(wú)論如何,都可以保證最近5個(gè)request的數(shù)據(jù)都是有序的。
大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者),大數(shù)據(jù),kafka,分布式文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-806008.html

到了這里,關(guān)于大數(shù)據(jù)開發(fā)之Kafka(概述、快速入門、生產(chǎn)者)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka入門,生產(chǎn)者異步發(fā)送、回調(diào)函數(shù),同步發(fā)送(四)

    引入依賴 回調(diào)函數(shù)會(huì)在producer收到ack時(shí)調(diào)用,該方法有兩個(gè)參數(shù),分別是元數(shù)據(jù)信息(RecordMetadata)和異常信息(Exception),如果Exception為null,說(shuō)明信息發(fā)送失敗 注意:消息發(fā)送失敗會(huì)自動(dòng)重試,不需要我們?cè)诨卣{(diào)函數(shù)中手動(dòng)重試。 只需在異步發(fā)送的基礎(chǔ)上,再調(diào)用一下 get(

    2024年02月11日
    瀏覽(28)
  • kafka生產(chǎn)者api和數(shù)據(jù)操作

    kafka生產(chǎn)者api和數(shù)據(jù)操作

    發(fā)送流程 消息發(fā)送過程中涉及到兩個(gè)線程—— main線程和Sender線程 main線程 使用serializer(并非java默認(rèn))序列化數(shù)據(jù),使用partitioner確認(rèn)發(fā)送分區(qū) 在main線程中創(chuàng)建了一個(gè)雙端隊(duì)列RecordAccumulator,main線程將批次數(shù)據(jù)發(fā)送給RecordAccumulator。 創(chuàng)建批次數(shù)據(jù)是從內(nèi)存池中分配內(nèi)存,在

    2024年02月13日
    瀏覽(23)
  • Kafka - 獲取 Topic 生產(chǎn)者發(fā)布數(shù)據(jù)命令

    從頭開始獲取 20 條數(shù)據(jù)(等價(jià)于時(shí)間升序) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic --from-beginning --max-messages 20 獲取最新 20 條數(shù)據(jù)(等價(jià)于時(shí)間降序)去掉 --from-beginning 即可(默認(rèn)) ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic your-topic?--max-me

    2024年02月14日
    瀏覽(22)
  • Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    Kafka3.0.0版本——生產(chǎn)者 數(shù)據(jù)去重

    1.1、至少一次 至少一次(At Least Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,kafka集群至少接收到一次數(shù)據(jù)。 至少一次的條件: ACK級(jí)別設(shè)置為-1 + 分區(qū)副本大于等于2 + ISR里應(yīng)答的最小副本數(shù)量大于等于2 1.2、最多一次 最多一次(At Most Once )的含義 生產(chǎn)者發(fā)送數(shù)據(jù)到kafka集群,

    2024年02月01日
    瀏覽(19)
  • Kafka3.0.0版本——生產(chǎn)者數(shù)據(jù)有序與亂序

    Kafka3.0.0版本——生產(chǎn)者數(shù)據(jù)有序與亂序

    單分區(qū)內(nèi),數(shù)據(jù)有序。如下圖partion0、partion1、partion2分區(qū)內(nèi),各自分區(qū)內(nèi)的數(shù)據(jù)有序。 2.1、kafka1.x版本之前保證數(shù)據(jù)單分區(qū)有序的條件 kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序,條件如下: 2.2、kafka1.x版本及以后保證數(shù)據(jù)單分區(qū)有序的條件 未開啟冪等性 開啟冪等性 2.3、kafka1

    2023年04月27日
    瀏覽(29)
  • 第3、4章 Kafka 生產(chǎn)者 和 消費(fèi)者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)

    第3、4章 Kafka 生產(chǎn)者 和 消費(fèi)者 ——向 Kafka 寫入數(shù)據(jù) 和讀取數(shù)據(jù)

    重要的特性: 消息通過 隊(duì)列來(lái)進(jìn)行交換 每條消息僅會(huì)傳遞給一個(gè)消費(fèi)者 消息傳遞有先后順序,消息被消費(fèi)后從隊(duì)列刪除(除非使用了消息優(yōu)先級(jí)) 生產(chǎn)者或者消費(fèi)者可以動(dòng)態(tài)加入 傳送模型: 異步即發(fā)即棄:生產(chǎn)者發(fā)送一條消息,不會(huì)等待收到一個(gè)響應(yīng) 異步請(qǐng)求、應(yīng)答:

    2024年02月20日
    瀏覽(21)
  • SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

    SparkStreaming學(xué)習(xí)——讀取socket的數(shù)據(jù)和kafka生產(chǎn)者的消息

    目錄 一、Spark Streaming概述 二、添加依賴 三、配置log4j 1.依賴下載好后打開IDEA最左側(cè)的外部庫(kù) 2.找到spark-core 3.找到apache.spark目錄 4.找到log4j-defaults.properties文件 5.將該文件放在資源目錄下,并修改文件名 6.修改log4j.properties第19行的內(nèi)容 四、Spark Streaming讀取Socket數(shù)據(jù)流 1.代碼編

    2023年04月27日
    瀏覽(19)
  • java:Kafka生產(chǎn)者推送數(shù)據(jù)與消費(fèi)者接收數(shù)據(jù)(參數(shù)配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式為host1:port1,host2:port2,…。生產(chǎn)者會(huì)從這些Broker中選擇一個(gè)可用的Broker作為消息發(fā)送的目標(biāo)Broker。 acks :Broker對(duì)消息的確認(rèn)模式??蛇x值為0、1、all。0表示生產(chǎn)者不會(huì)等待Broker的任何確認(rèn)消息;1表示生產(chǎn)者會(huì)等待Broker的Leader副本確認(rèn)

    2024年02月16日
    瀏覽(35)
  • kafka-保證數(shù)據(jù)不重復(fù)-生產(chǎn)者開啟冪等性和事務(wù)的作用?

    kafka-保證數(shù)據(jù)不重復(fù)-生產(chǎn)者開啟冪等性和事務(wù)的作用?

    適用于消息在寫入到服務(wù)器日志后,由于網(wǎng)絡(luò)故障,生產(chǎn)者沒有及時(shí)收到服務(wù)端的 ACK 消息,生產(chǎn)者誤以為消息沒有持久化到服務(wù)端,導(dǎo)致生產(chǎn)者重復(fù)發(fā)送該消息,造成了消息的重復(fù)現(xiàn)象,而冪等性就是為了解決該問題。 通過3個(gè)值的唯一性去重: PID:生產(chǎn)者ID 分區(qū)號(hào) seq:?jiǎn)?/p>

    2024年02月14日
    瀏覽(17)
  • Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    Kafka生產(chǎn)者原理 kafka生產(chǎn)者發(fā)送流程 kafka消息發(fā)送到集群步驟 kafka如何發(fā)送消息 kafka詳解

    kafka尚硅谷視頻: 10_尚硅谷_Kafka_生產(chǎn)者_(dá)原理_嗶哩嗶哩_bilibili ? ???? 1. producer初始化:加載默認(rèn)配置,以及配置的參數(shù),開啟網(wǎng)絡(luò)線程 ???? 2. 攔截器攔截 ???? 3. 序列化器進(jìn)行消息key, value序列化 ???? 4. 進(jìn)行分區(qū) ???? 5. kafka broker集群 獲取metaData ???? 6. 消息緩存到

    2024年02月11日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包