kafka默認(rèn)配置
每個kafka broker中配置文件server.properties
默認(rèn)必須配置的屬性如下:
broker.id=0
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=1000000
配置文件中參數(shù)說明
#**Server Basics**
#broker的主機(jī)IP地址,一般不設(shè)置
host.name=
#broker服務(wù)端口,生產(chǎn)者和消費(fèi)者再此端口建立連接
port =端口
#broker在集群中的唯一標(biāo)識,正數(shù)
broker.id=1
#后臺任務(wù)處理的線程數(shù)
background.threads=4
#請求隊列最大數(shù)量
queued.max.requests=500
#**Socket Server Settings**
#處理消息的最大線程數(shù),一般情況下數(shù)量為cpu核數(shù)
num.network.threads=3
#處理磁盤IO的線程數(shù),數(shù)值為cpu核數(shù)2倍
num.io.threads=8
#發(fā)送緩沖區(qū)
socket.send.buffer.bytes=1000*1024
#接收緩沖區(qū)
socket.receive.buffer.bytes=1000*1024
#請求的最大數(shù)值
socket.request.max.bytes=104857600
#**Log Basics**
#kafka數(shù)據(jù)的存放位置,多個用逗號分隔;
log.dirs=/kafka/logs
#每個數(shù)據(jù)目錄的線程數(shù),用于啟動時的日志恢復(fù)和關(guān)閉時的刷新
num.recovery.threads.per.data.dir=1
#**Log Retention Policy**
#是否開啟清理
log.cleaner.enable=false
#清理運(yùn)行的線程數(shù)
log.cleaner.threads = 2
#清理時每秒處理的字節(jié)數(shù)
log.cleaner.io.max.bytes.per.second=None
#segment即使沒有達(dá)到log.segment.bytes設(shè)置的大小,也會強(qiáng)制新建一個segment
log.roll.hours =24*7
#數(shù)據(jù)文件保留多長時間
log.retention.minutes=3000
或者
log.retention.hours=50
#清理策略:delete或者compact
log.cleanup.policy = delete
#topic每個分區(qū)的最大文件大小,-1沒有大小限制
log.retention.bytes=-1
#消息體的最大字節(jié),小于socket.request.max.bytes
message.max.bytes=5242880
#自動創(chuàng)建topic時的默認(rèn)副本的個數(shù)
default.replication.factor=3
#為每個分區(qū)設(shè)置獲取的消息的字節(jié)數(shù)
replica.fetch.max.bytes=104857600
#topic的分區(qū)是以segment文件存儲的,這個控制每個segment的大小
log.segment.bytes=1073741824
#文件大小檢查的周期時間
log.retention.check.interval.ms=300000
#清理時候用到的IO緩存大小,一般不需要修改
log.cleaner.io.buffer.size=512*1024
#清理中hash表的擴(kuò)大因子,一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
#檢查是否觸發(fā)清理
log.cleaner.backoff.ms =15000
#清理頻率,越大意味著更高效的清理,但浪費(fèi)空間
log.cleaner.min.cleanable.ratio=0.5
#壓縮信息保留的最長時間,也是客戶端消費(fèi)消息的最長時間
log.cleaner.delete.retention.ms =24*60*60*1000
#segment索引文件大小
log.index.size.max.bytes =10*1024*1024
#fetch操作,需要空間來掃描offset值,值越大掃描速度越快,但浪費(fèi)空間
log.index.interval.bytes =4096
#partiton緩存,每當(dāng)消息記錄數(shù)達(dá)到1000時flush一次數(shù)據(jù)到磁盤
log.flush.interval.messages=1000
#檢查數(shù)據(jù)是否要寫入到硬盤的時間間隔
log.flush.scheduler.interval.ms =3000
#如果消息量始終沒有達(dá)到閥值,但是離上一次磁盤同步的時間間隔達(dá)到閥值,也將觸發(fā)表示每間隔1000毫秒flush一次數(shù)據(jù)到磁盤
log.flush.interval.ms=1000
#文件在索引中清除后保留的時間一般不需要去修改
log.delete.delay.ms =60000
#控制上次flush到硬盤的時間點(diǎn),以便于數(shù)據(jù)恢復(fù)一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
#**Internal Topic Settings**
#是否允許自動創(chuàng)建topic,若是false,就需要通過命令創(chuàng)建topic
auto.create.topics.enable =true
#每個topic的分區(qū)個數(shù)
num.partitions=3
#topic的offset的備份數(shù),建議3、4個
offsets.topic.replication.factor=3
#事物備份數(shù),建議3、4個
transaction.state.log.replication.factor=3
#事務(wù)重寫備份數(shù),建議3、4個
transaction.state.log.min.isr=3
#**Zookeeper**
#zookeeper集群的地址
zookeeper.connect=IP1:端口,IP2:端口,IP3:端口...
#zookeeper的連接超時時間
zookeeper.connection.timeout.ms =6000
#zookeeper的心跳間隔,不易過大
zookeeper.session.timeout.ms=6000
#zookeeper集群中l(wèi)eader和follower之間的同步時間
zookeeper.sync.time.ms =2000
#**Group Coordinator Settings**
#/*在進(jìn)行第一次重新平衡之前,group協(xié)調(diào)員將等待更多消費(fèi)者加入group的時間,
#延遲時間越長意味著重新平衡的可能性越小,但是等待處理開始的時間增加*/
group.initial.rebalance.delay.ms=3000
生產(chǎn)者配置介紹
-
**bootstrap.servers**
- 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個或多個。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會從給定的broker里尋找其它的broker。 -
**key.serializer和value.serializer**
- broker接收消息必須以字節(jié)數(shù)組byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的類型。key.serializer和value.serializer分別用來指定key和value序列化操作的序列化器,無默認(rèn)值。類的全限定名。 -
retry.backoff.ms
- 用來設(shè)定兩次重試之間的時間間隔,默認(rèn)值100。 -
**partitioner.class**
- 顯示配置使用哪個分區(qū)器。 -
**interceptor.classes**
- 指定自定義攔截器,多個傳List集合。 -
**compression.type**
- 指定消息的壓縮方式,默認(rèn)值為"none",可以配置為"gzip",“snappy”和“l(fā)z4”。 -
**connections.max.idle.ms**
- 用來指定多久之后關(guān)閉閑置的連接,默認(rèn)值540000(ms),即9min -
**receive.buffer.bytes**
- 用來設(shè)置socket接收緩沖區(qū)的大小,默認(rèn)值為32768(B),即32KB,如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。 -
**enable.idempotence**
- 是否開啟冪等性功能,默認(rèn)值false -
**max.in.flight.requests.per.connection**
- 限制每個連接,也就是客戶端與Node之間的連接最多緩存請求數(shù),默認(rèn)值5 -
**transactional.id**
- 設(shè)置事物id,必須唯一,默認(rèn)值null -
acks
:指定了必須有多少個分區(qū)副本收到消息,生產(chǎn)者才會認(rèn)為消息寫入是成功的。默認(rèn)為**acks=1
**-
acks=0
如果設(shè)置為 0,則 Producer 不會等待服務(wù)器的反饋。該消息會被立刻添加到 socket buffer 中并認(rèn)為已經(jīng)發(fā)送完成。在這種情況下,服務(wù)器是否收到請求是沒法保證的,并且參數(shù)**retries
**也不會生效(因?yàn)榭蛻舳藷o法獲得失敗信息)。每個記錄返回的 offset 總是被設(shè)置為-1。 -
acks=1
如果設(shè)置為 1,表示只要集群的leader分區(qū)副本接收到了消息,就會向生產(chǎn)者發(fā)送一個成功響應(yīng)的ack,此時生產(chǎn)者接收到ack之后就可以認(rèn)為該消息是寫入成功的。leader 節(jié)點(diǎn)會將記錄寫入本地日志,并且在所有 follower 節(jié)點(diǎn)反饋之前就先確認(rèn)成功。在這種情況下,如果 leader 節(jié)點(diǎn)在接收記錄之后,并且在 follower 節(jié)點(diǎn)復(fù)制數(shù)據(jù)完成之前產(chǎn)生錯誤,則這條記錄會丟失。 -
acks=all
如果設(shè)置為 all,這就意味著 leader 節(jié)點(diǎn)會等待**所有同步中的副本(ISR)**確認(rèn)之后再確認(rèn)這條記錄是否發(fā)送完成。只要至少有一個同步副本存在,記錄就不會丟失。這種方式是對請求傳遞的最有效保證。acks=-1 與 acks=all 是等效的。注意:
這里是所有的isr內(nèi)副本,min.insync.replicas只是一個最低限制,即同步副本少于該配置值,則會拋異常,如果ISR中的副本數(shù)小于min.insync.replicas,消息只能讀,不能寫入。
-
-
buffer.memory
:用來設(shè)置 Producer 緩沖區(qū)大小。 -
compression.type
:Producer 生成數(shù)據(jù)時可使用的壓縮類型。默認(rèn)值是 none(即不壓縮)??膳渲玫膲嚎s類型包括:none
、gzip
、snappy
、lz4
或zstd
。壓縮是針對批處理的所有數(shù)據(jù),所以批處理的效果也會影響壓縮比(更多的批處理意味著更好的壓縮)。 -
retries
:用來設(shè)置發(fā)送失敗的重試次數(shù)。 -
batch.size
:用來設(shè)置一個批次可占用的內(nèi)存大小。 -
linger.ms
:用來設(shè)置 Producer 在發(fā)送批次前的等待時間。 -
client.id
:Kafka 服務(wù)器用它來識別消息源,可以是任意字符串。 -
max.in.flight.requests.per.connection
:用來設(shè)置Producer在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求。默認(rèn)值為5。如果設(shè)置1,可以避免生產(chǎn)者發(fā)送消息亂序,雖然吞吐量降低了,但是安全性得到了提升,要權(quán)衡業(yè)務(wù)場景配置。(比如生產(chǎn)者發(fā)送兩條順序消息1,2,都是異步發(fā)送,同步發(fā)送性能低,如果2成功,1因?yàn)榫W(wǎng)絡(luò)問題重試發(fā)送成功,1就到2后面,亂序了)。 -
timeout.ms
:用來設(shè)置 Broker 等待同步副本返回消息確認(rèn)的時間,與acks
的配置相匹配。 -
request.timeout.ms
:Producer 在發(fā)送數(shù)據(jù)時等待服務(wù)器返回響應(yīng)的時間。 -
max.block.ms
:該配置控制KafkaProducer.send()
和**KafkaProducer.partitionsFor()
** 允許被阻塞的時長。這些方法可能因?yàn)榫彌_區(qū)滿了或者元數(shù)據(jù)不可用而被阻塞。用戶提供的序列化程序或分區(qū)程序的阻塞將不會被計算到這個超時。 -
max.request.size
:請求的最大字節(jié)數(shù)。 -
receieve.buffer.bytes
:TCP 接收緩沖區(qū)的大小。 -
send.buffer.bytes
- 用來設(shè)置socket發(fā)送緩沖區(qū)的大小,默認(rèn)值為131072(B),即128KB,如果設(shè)置為-1,則使用操作系統(tǒng)默認(rèn)值。 -
request.timeout.ms
- 配置Producer等待請求響應(yīng)的最長時間,默認(rèn)值為30000(ms),請求超時之后可以進(jìn)行重試。注意這個參數(shù)需要比broker端參數(shù)replica.lag.time.max.ms的值要大,這樣可以減少因客戶端重試而引起的消息重復(fù)的概率。 -
**buffer.memory**
- 生產(chǎn)者客戶端RecordAccumulator緩存大小,默認(rèn)值為33554432B,即32M。 -
**metadata.max.age.ms**
- 當(dāng)客戶端超過這個時間間隔時就會更新元數(shù)據(jù)信息默認(rèn)值300000,即5分鐘。元數(shù)據(jù)指集群中有哪些主題,主題有哪些分區(qū),每個分區(qū)leader副本在哪個節(jié)點(diǎn)上,follower副本在哪個節(jié)點(diǎn)上,哪些副本在AR,ISR等集合中,集群中有哪些節(jié)點(diǎn)等等。 -
**max.request.size**
- 用來限制生產(chǎn)者客戶端能發(fā)送的消息的最大值,默認(rèn)值為1048576B,即1MB。這個參數(shù)涉及到其它參數(shù)的聯(lián)動,比如broker端的message.max.bytes參數(shù)。對kafka沒有足夠把控的時候不要更改此參數(shù)。 -
producer.type
:該參數(shù)指定了在后臺線程中消息的發(fā)送方式是同步的還是異步的,默認(rèn)是sync的方式,即producer.type=sync。如果設(shè)置成異步的模式,即producer.type=async,但是這樣會增加丟失數(shù)據(jù)的風(fēng)險。如果需要確保消息的可靠性,必須要將producer.type設(shè)置為sync。 -
queue.buffering.max.ms
默認(rèn)值:5000。啟用異步模式時,producer緩存消息的時間。比如我們設(shè)置成1000時,它會緩存1s的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。 -
queue.buffering.max.messages
默認(rèn)值:10000。啟用異步模式時,producer緩存隊列里最大緩存的消息數(shù)量,如果超過這個值,producer就會阻塞或者丟掉消息。 -
queue.enqueue.timeout.ms
默認(rèn)值:-1。當(dāng)達(dá)到上面參數(shù)時producer會阻塞等待的時間。如果設(shè)置為0,buffer隊列滿時producer不會阻塞,消息直接被丟掉;若設(shè)置為-1,producer會被阻塞,不會丟消息。 -
batch.num.messages
默認(rèn)值:200。啟用異步模式時,一個batch緩存的消息數(shù)量。達(dá)到這個數(shù)值時,producer才會發(fā)送消息。(每次批量發(fā)送的數(shù)量)
消費(fèi)者配置介紹
-
bootstrap.servers
- Broker 集群地址,格式:ip1:port,ip2:port…,不需要設(shè)定全部的集群地址,設(shè)置兩個或者兩個以上即可。 -
group.id
- 消費(fèi)者隸屬的消費(fèi)者組名稱,如果為空會報異常,一般而言,這個參數(shù)要有一定的業(yè)務(wù)意義。 -
fetch.min.bytes
- 消費(fèi)者獲取記錄的最小字節(jié)數(shù)。Kafka 會等到有足夠的數(shù)據(jù)時才返回消息給消費(fèi)者,以降低負(fù)載。 -
**fetch.max.bytes**
- 單次獲取數(shù)據(jù)的最大消息數(shù)。 -
fetch.max.wait.ms
- Kafka 需要等待足夠的數(shù)據(jù)才返回給消費(fèi)者,如果一直沒有足夠的數(shù)據(jù),消費(fèi)者就會遲遲收不到消息。所以需要指定 Broker 的等待延遲,一旦超時,直接返回數(shù)據(jù)給消費(fèi)者。 -
max.partition.fetch.bytes
- 指定了服務(wù)器從每個分區(qū)返回給消費(fèi)者的最大字節(jié)數(shù)。默認(rèn)為 1 MB。 -
session.timeout.ms
- 指定了消費(fèi)者的心跳超時時間。如果消費(fèi)者沒有在有效時間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,協(xié)調(diào)器會視消費(fèi)者已經(jīng)消亡,從而觸發(fā)分區(qū)再均衡。默認(rèn)為 3 秒。 -
enable.auto.commit
- 指定了是否自動提交消息偏移量,默認(rèn)開啟。 - **
partition.assignment.strategy
**消費(fèi)者的分區(qū)分配策略。-
Range
- 表示會將主題的若干個連續(xù)的分區(qū)分配給消費(fèi)者。 -
RoundRobin
- 表示會將主題的所有分區(qū)按照輪詢方式分配給消費(fèi)者。
-
-
client.id
- 客戶端標(biāo)識。 -
max.poll.records
- 一次性拉取的條數(shù),這個參數(shù)用來配置 Consumer 在一次拉取請求中拉取的最大消息數(shù),默認(rèn)值為500(條)。如果消息的大小都比較小,則可以適當(dāng)調(diào)大這個參數(shù)值來提升一定的消費(fèi)速度。 -
max.poll.interval.ms
- -
receive.buffer.bytes
- 用于設(shè)置 Socket 接收消息緩沖區(qū)(SO_RECBUF)的大小,默認(rèn)值為 64KB。如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。 -
send.buffer.bytes
- 用于設(shè)置 Socket 發(fā)送消息緩沖區(qū)(SO_SNDBUF)的大小,默認(rèn)值為 128KB。與 receive.buffer.bytes 參數(shù)一樣,如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。 -
connections.max.idle.ms
- 這個參數(shù)用來指定在多久之后關(guān)閉閑置的連接,默認(rèn)值是540000(ms),即9分鐘。 -
exclude.internal.topics
- Kafka 中有兩個內(nèi)部的主題:__consumer_offsets
和__transaction_state
。exclude.internal.topics 用來指定 Kafka 中的內(nèi)部主題是否可以向消費(fèi)者公開,默認(rèn)值為 true。如果設(shè)置為 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式來訂閱內(nèi)部主題,設(shè)置為 false 則沒有這個限制。 -
receive.buffer.bytes
- 這個參數(shù)用來設(shè)置 Socket 接收消息緩沖區(qū)(SO_RECBUF)的大小,默認(rèn)值為65536(B),即64KB。如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。如果 Consumer 與 Kafka 處于不同的機(jī)房,則可以適當(dāng)調(diào)大這個參數(shù)值。 -
**send.buffer.bytes**
- 這個參數(shù)用來設(shè)置Socket發(fā)送消息緩沖區(qū)(SO_SNDBUF)的大小,默認(rèn)值為131072(B),即128KB。與receive.buffer.bytes參數(shù)一樣,如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值 -
**request.timeout.ms**
- 這個參數(shù)用來配置 Consumer 等待請求響應(yīng)的最長時間,默認(rèn)值為30000(ms)。 -
**metadata.max.age.ms**
- 這個參數(shù)用來配置元數(shù)據(jù)的過期時間,默認(rèn)值為300000(ms),即5分鐘。如果元數(shù)據(jù)在此參數(shù)所限定的時間范圍內(nèi)沒有進(jìn)行更新,則會被強(qiáng)制更新,即使沒有任何分區(qū)變化或有新的 broker 加入 -
**reconnect.backoff.ms**
- 這個參數(shù)用來配置嘗試重新連接指定主機(jī)之前的等待時間(也稱為退避時間),避免頻繁地連接主機(jī),默認(rèn)值為50(ms)。這種機(jī)制適用于消費(fèi)者向 broker 發(fā)送的所有請求。 -
**auto.offset.reset**
- 在 Kafka 中,每當(dāng)消費(fèi)者組內(nèi)的消費(fèi)者查找不到所記錄的消費(fèi)位移或發(fā)生位移越界時,就會根據(jù)消費(fèi)者客戶端參數(shù) auto.offset.reset 的配置來決定從何處開始進(jìn)行消費(fèi),這個參數(shù)的默認(rèn)值為 “l(fā)atest” 。-
**earliest**
:當(dāng)各分區(qū)下存在已提交的 offset 時,從提交的 offset 開始消費(fèi);無提交的 offset 時,從頭開始消費(fèi)。 -
**latest**
:當(dāng)各分區(qū)下存在已提交的 offset 時,從提交的 offset 開始消費(fèi);無提交的 offset 時,消費(fèi)該分區(qū)下新產(chǎn)生的數(shù)據(jù)。 -
none
:topic 各分區(qū)都存在已提交的 offset 時,從 offset 后開始消費(fèi);只要有一個分區(qū)不存在已提交的offset,則拋出異常。
-
-
interceptor.class
- 用來配置消費(fèi)者客戶端的攔截器
--------------------------------------歡迎叨擾此地址---------------------------------------文章來源:http://www.zghlxwxcb.cn/news/detail-532476.html
本文作者:Java技術(shù)債務(wù)
原文鏈接:https://cuizb.top/myblog/article/1677833372
版權(quán)聲明: 本博客所有文章除特別聲明外,均采用 CC BY 3.0 CN協(xié)議進(jìn)行許可。轉(zhuǎn)載請署名作者且注明文章出處。文章來源地址http://www.zghlxwxcb.cn/news/detail-532476.html
到了這里,關(guān)于kafka生產(chǎn)者和消費(fèi)者配置介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!