国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka生產(chǎn)者和消費(fèi)者配置介紹

這篇具有很好參考價值的文章主要介紹了kafka生產(chǎn)者和消費(fèi)者配置介紹。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

kafka默認(rèn)配置

每個kafka broker中配置文件server.properties默認(rèn)必須配置的屬性如下:

broker.id=0
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=1000000

配置文件中參數(shù)說明

#**Server Basics**
#broker的主機(jī)IP地址,一般不設(shè)置
host.name=
#broker服務(wù)端口,生產(chǎn)者和消費(fèi)者再此端口建立連接
port =端口
#broker在集群中的唯一標(biāo)識,正數(shù)
broker.id=1
#后臺任務(wù)處理的線程數(shù)
background.threads=4
#請求隊列最大數(shù)量
queued.max.requests=500
#**Socket Server Settings**
#處理消息的最大線程數(shù),一般情況下數(shù)量為cpu核數(shù)
num.network.threads=3
#處理磁盤IO的線程數(shù),數(shù)值為cpu核數(shù)2倍
num.io.threads=8
#發(fā)送緩沖區(qū)
socket.send.buffer.bytes=1000*1024
#接收緩沖區(qū)
socket.receive.buffer.bytes=1000*1024
#請求的最大數(shù)值
socket.request.max.bytes=104857600
#**Log Basics**
#kafka數(shù)據(jù)的存放位置,多個用逗號分隔;
log.dirs=/kafka/logs
#每個數(shù)據(jù)目錄的線程數(shù),用于啟動時的日志恢復(fù)和關(guān)閉時的刷新
num.recovery.threads.per.data.dir=1
#**Log Retention Policy**    
#是否開啟清理
log.cleaner.enable=false
#清理運(yùn)行的線程數(shù)
log.cleaner.threads = 2
#清理時每秒處理的字節(jié)數(shù)
log.cleaner.io.max.bytes.per.second=None
#segment即使沒有達(dá)到log.segment.bytes設(shè)置的大小,也會強(qiáng)制新建一個segment
log.roll.hours =24*7
#數(shù)據(jù)文件保留多長時間
log.retention.minutes=3000
或者
log.retention.hours=50
#清理策略:delete或者compact
log.cleanup.policy = delete
#topic每個分區(qū)的最大文件大小,-1沒有大小限制
log.retention.bytes=-1
#消息體的最大字節(jié),小于socket.request.max.bytes
message.max.bytes=5242880
#自動創(chuàng)建topic時的默認(rèn)副本的個數(shù)
default.replication.factor=3
#為每個分區(qū)設(shè)置獲取的消息的字節(jié)數(shù)
replica.fetch.max.bytes=104857600
#topic的分區(qū)是以segment文件存儲的,這個控制每個segment的大小
log.segment.bytes=1073741824
#文件大小檢查的周期時間
log.retention.check.interval.ms=300000
#清理時候用到的IO緩存大小,一般不需要修改
log.cleaner.io.buffer.size=512*1024
#清理中hash表的擴(kuò)大因子,一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
#檢查是否觸發(fā)清理
log.cleaner.backoff.ms =15000
#清理頻率,越大意味著更高效的清理,但浪費(fèi)空間
log.cleaner.min.cleanable.ratio=0.5
#壓縮信息保留的最長時間,也是客戶端消費(fèi)消息的最長時間
log.cleaner.delete.retention.ms =24*60*60*1000
#segment索引文件大小
log.index.size.max.bytes =10*1024*1024
#fetch操作,需要空間來掃描offset值,值越大掃描速度越快,但浪費(fèi)空間
log.index.interval.bytes =4096
#partiton緩存,每當(dāng)消息記錄數(shù)達(dá)到1000時flush一次數(shù)據(jù)到磁盤
log.flush.interval.messages=1000
#檢查數(shù)據(jù)是否要寫入到硬盤的時間間隔
log.flush.scheduler.interval.ms =3000
#如果消息量始終沒有達(dá)到閥值,但是離上一次磁盤同步的時間間隔達(dá)到閥值,也將觸發(fā)表示每間隔1000毫秒flush一次數(shù)據(jù)到磁盤
log.flush.interval.ms=1000
#文件在索引中清除后保留的時間一般不需要去修改
log.delete.delay.ms =60000
#控制上次flush到硬盤的時間點(diǎn),以便于數(shù)據(jù)恢復(fù)一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
#**Internal Topic Settings**
#是否允許自動創(chuàng)建topic,若是false,就需要通過命令創(chuàng)建topic
auto.create.topics.enable =true
#每個topic的分區(qū)個數(shù)
num.partitions=3
#topic的offset的備份數(shù),建議3、4個
offsets.topic.replication.factor=3
#事物備份數(shù),建議3、4個
transaction.state.log.replication.factor=3
#事務(wù)重寫備份數(shù),建議3、4個
transaction.state.log.min.isr=3
#**Zookeeper**
#zookeeper集群的地址
zookeeper.connect=IP1:端口,IP2:端口,IP3:端口...
#zookeeper的連接超時時間
zookeeper.connection.timeout.ms =6000
#zookeeper的心跳間隔,不易過大
zookeeper.session.timeout.ms=6000
#zookeeper集群中l(wèi)eader和follower之間的同步時間
zookeeper.sync.time.ms =2000
#**Group Coordinator Settings**
#/*在進(jìn)行第一次重新平衡之前,group協(xié)調(diào)員將等待更多消費(fèi)者加入group的時間,
#延遲時間越長意味著重新平衡的可能性越小,但是等待處理開始的時間增加*/
group.initial.rebalance.delay.ms=3000

生產(chǎn)者配置介紹

  • **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個或多個。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會從給定的broker里尋找其它的broker。
  • **key.serializer和value.serializer** - broker接收消息必須以字節(jié)數(shù)組byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的類型。key.serializer和value.serializer分別用來指定key和value序列化操作的序列化器,無默認(rèn)值。類的全限定名。
  • retry.backoff.ms - 用來設(shè)定兩次重試之間的時間間隔,默認(rèn)值100。
  • **partitioner.class** - 顯示配置使用哪個分區(qū)器。
  • **interceptor.classes** - 指定自定義攔截器,多個傳List集合。
  • **compression.type** - 指定消息的壓縮方式,默認(rèn)值為"none",可以配置為"gzip",“snappy”和“l(fā)z4”。
  • **connections.max.idle.ms** - 用來指定多久之后關(guān)閉閑置的連接,默認(rèn)值540000(ms),即9min
  • **receive.buffer.bytes** - 用來設(shè)置socket接收緩沖區(qū)的大小,默認(rèn)值為32768(B),即32KB,如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。
  • **enable.idempotence** - 是否開啟冪等性功能,默認(rèn)值false
  • **max.in.flight.requests.per.connection** - 限制每個連接,也就是客戶端與Node之間的連接最多緩存請求數(shù),默認(rèn)值5
  • **transactional.id** - 設(shè)置事物id,必須唯一,默認(rèn)值null
  • acks:指定了必須有多少個分區(qū)副本收到消息,生產(chǎn)者才會認(rèn)為消息寫入是成功的。默認(rèn)為**acks=1**
    • acks=0 如果設(shè)置為 0,則 Producer 不會等待服務(wù)器的反饋。該消息會被立刻添加到 socket buffer 中并認(rèn)為已經(jīng)發(fā)送完成。在這種情況下,服務(wù)器是否收到請求是沒法保證的,并且參數(shù)**retries**也不會生效(因?yàn)榭蛻舳藷o法獲得失敗信息)。每個記錄返回的 offset 總是被設(shè)置為-1。

    • acks=1 如果設(shè)置為 1,表示只要集群的leader分區(qū)副本接收到了消息,就會向生產(chǎn)者發(fā)送一個成功響應(yīng)的ack,此時生產(chǎn)者接收到ack之后就可以認(rèn)為該消息是寫入成功的。leader 節(jié)點(diǎn)會將記錄寫入本地日志,并且在所有 follower 節(jié)點(diǎn)反饋之前就先確認(rèn)成功。在這種情況下,如果 leader 節(jié)點(diǎn)在接收記錄之后,并且在 follower 節(jié)點(diǎn)復(fù)制數(shù)據(jù)完成之前產(chǎn)生錯誤,則這條記錄會丟失。

    • acks=all 如果設(shè)置為 all,這就意味著 leader 節(jié)點(diǎn)會等待**所有同步中的副本(ISR)**確認(rèn)之后再確認(rèn)這條記錄是否發(fā)送完成。只要至少有一個同步副本存在,記錄就不會丟失。這種方式是對請求傳遞的最有效保證。acks=-1 與 acks=all 是等效的。

      注意:
      這里是所有的isr內(nèi)副本,min.insync.replicas只是一個最低限制,即同步副本少于該配置值,則會拋異常,如果ISR中的副本數(shù)小于min.insync.replicas,消息只能讀,不能寫入。

  • buffer.memory:用來設(shè)置 Producer 緩沖區(qū)大小。
  • compression.type:Producer 生成數(shù)據(jù)時可使用的壓縮類型。默認(rèn)值是 none(即不壓縮)??膳渲玫膲嚎s類型包括:none、gzipsnappy 、lz4zstd。壓縮是針對批處理的所有數(shù)據(jù),所以批處理的效果也會影響壓縮比(更多的批處理意味著更好的壓縮)。
  • retries:用來設(shè)置發(fā)送失敗的重試次數(shù)。
  • batch.size:用來設(shè)置一個批次可占用的內(nèi)存大小。
  • linger.ms:用來設(shè)置 Producer 在發(fā)送批次前的等待時間。
  • client.id:Kafka 服務(wù)器用它來識別消息源,可以是任意字符串。
  • max.in.flight.requests.per.connection:用來設(shè)置Producer在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求。默認(rèn)值為5。如果設(shè)置1,可以避免生產(chǎn)者發(fā)送消息亂序,雖然吞吐量降低了,但是安全性得到了提升,要權(quán)衡業(yè)務(wù)場景配置。(比如生產(chǎn)者發(fā)送兩條順序消息1,2,都是異步發(fā)送,同步發(fā)送性能低,如果2成功,1因?yàn)榫W(wǎng)絡(luò)問題重試發(fā)送成功,1就到2后面,亂序了)。
  • timeout.ms:用來設(shè)置 Broker 等待同步副本返回消息確認(rèn)的時間,與 acks 的配置相匹配。
  • request.timeout.ms:Producer 在發(fā)送數(shù)據(jù)時等待服務(wù)器返回響應(yīng)的時間。
  • max.block.ms:該配置控制 KafkaProducer.send() 和**KafkaProducer.partitionsFor()** 允許被阻塞的時長。這些方法可能因?yàn)榫彌_區(qū)滿了或者元數(shù)據(jù)不可用而被阻塞。用戶提供的序列化程序或分區(qū)程序的阻塞將不會被計算到這個超時。
  • max.request.size:請求的最大字節(jié)數(shù)。
  • receieve.buffer.bytes:TCP 接收緩沖區(qū)的大小。
  • send.buffer.bytes - 用來設(shè)置socket發(fā)送緩沖區(qū)的大小,默認(rèn)值為131072(B),即128KB,如果設(shè)置為-1,則使用操作系統(tǒng)默認(rèn)值。
  • request.timeout.ms - 配置Producer等待請求響應(yīng)的最長時間,默認(rèn)值為30000(ms),請求超時之后可以進(jìn)行重試。注意這個參數(shù)需要比broker端參數(shù)replica.lag.time.max.ms的值要大,這樣可以減少因客戶端重試而引起的消息重復(fù)的概率。
  • **buffer.memory** - 生產(chǎn)者客戶端RecordAccumulator緩存大小,默認(rèn)值為33554432B,即32M。
  • **metadata.max.age.ms** - 當(dāng)客戶端超過這個時間間隔時就會更新元數(shù)據(jù)信息默認(rèn)值300000,即5分鐘。元數(shù)據(jù)指集群中有哪些主題,主題有哪些分區(qū),每個分區(qū)leader副本在哪個節(jié)點(diǎn)上,follower副本在哪個節(jié)點(diǎn)上,哪些副本在AR,ISR等集合中,集群中有哪些節(jié)點(diǎn)等等。
  • **max.request.size** - 用來限制生產(chǎn)者客戶端能發(fā)送的消息的最大值,默認(rèn)值為1048576B,即1MB。這個參數(shù)涉及到其它參數(shù)的聯(lián)動,比如broker端的message.max.bytes參數(shù)。對kafka沒有足夠把控的時候不要更改此參數(shù)。
  • producer.type:該參數(shù)指定了在后臺線程中消息的發(fā)送方式是同步的還是異步的,默認(rèn)是sync的方式,即producer.type=sync。如果設(shè)置成異步的模式,即producer.type=async,但是這樣會增加丟失數(shù)據(jù)的風(fēng)險。如果需要確保消息的可靠性,必須要將producer.type設(shè)置為sync。
  • queue.buffering.max.ms 默認(rèn)值:5000。啟用異步模式時,producer緩存消息的時間。比如我們設(shè)置成1000時,它會緩存1s的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。
  • queue.buffering.max.messages 默認(rèn)值:10000。啟用異步模式時,producer緩存隊列里最大緩存的消息數(shù)量,如果超過這個值,producer就會阻塞或者丟掉消息。
  • queue.enqueue.timeout.ms 默認(rèn)值:-1。當(dāng)達(dá)到上面參數(shù)時producer會阻塞等待的時間。如果設(shè)置為0,buffer隊列滿時producer不會阻塞,消息直接被丟掉;若設(shè)置為-1,producer會被阻塞,不會丟消息。
  • batch.num.messages 默認(rèn)值:200。啟用異步模式時,一個batch緩存的消息數(shù)量。達(dá)到這個數(shù)值時,producer才會發(fā)送消息。(每次批量發(fā)送的數(shù)量)

消費(fèi)者配置介紹

  • bootstrap.servers - Broker 集群地址,格式:ip1:port,ip2:port…,不需要設(shè)定全部的集群地址,設(shè)置兩個或者兩個以上即可。
  • group.id - 消費(fèi)者隸屬的消費(fèi)者組名稱,如果為空會報異常,一般而言,這個參數(shù)要有一定的業(yè)務(wù)意義。
  • fetch.min.bytes - 消費(fèi)者獲取記錄的最小字節(jié)數(shù)。Kafka 會等到有足夠的數(shù)據(jù)時才返回消息給消費(fèi)者,以降低負(fù)載。
  • **fetch.max.bytes** - 單次獲取數(shù)據(jù)的最大消息數(shù)。
  • fetch.max.wait.ms - Kafka 需要等待足夠的數(shù)據(jù)才返回給消費(fèi)者,如果一直沒有足夠的數(shù)據(jù),消費(fèi)者就會遲遲收不到消息。所以需要指定 Broker 的等待延遲,一旦超時,直接返回數(shù)據(jù)給消費(fèi)者。
  • max.partition.fetch.bytes - 指定了服務(wù)器從每個分區(qū)返回給消費(fèi)者的最大字節(jié)數(shù)。默認(rèn)為 1 MB。
  • session.timeout.ms - 指定了消費(fèi)者的心跳超時時間。如果消費(fèi)者沒有在有效時間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,協(xié)調(diào)器會視消費(fèi)者已經(jīng)消亡,從而觸發(fā)分區(qū)再均衡。默認(rèn)為 3 秒。
  • enable.auto.commit - 指定了是否自動提交消息偏移量,默認(rèn)開啟。
  • **partition.assignment.strategy**消費(fèi)者的分區(qū)分配策略。
    • Range - 表示會將主題的若干個連續(xù)的分區(qū)分配給消費(fèi)者。
    • RoundRobin - 表示會將主題的所有分區(qū)按照輪詢方式分配給消費(fèi)者。
  • client.id - 客戶端標(biāo)識。
  • max.poll.records - 一次性拉取的條數(shù),這個參數(shù)用來配置 Consumer 在一次拉取請求中拉取的最大消息數(shù),默認(rèn)值為500(條)。如果消息的大小都比較小,則可以適當(dāng)調(diào)大這個參數(shù)值來提升一定的消費(fèi)速度。
  • max.poll.interval.ms -
  • receive.buffer.bytes - 用于設(shè)置 Socket 接收消息緩沖區(qū)(SO_RECBUF)的大小,默認(rèn)值為 64KB。如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。
  • send.buffer.bytes - 用于設(shè)置 Socket 發(fā)送消息緩沖區(qū)(SO_SNDBUF)的大小,默認(rèn)值為 128KB。與 receive.buffer.bytes 參數(shù)一樣,如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。
  • connections.max.idle.ms - 這個參數(shù)用來指定在多久之后關(guān)閉閑置的連接,默認(rèn)值是540000(ms),即9分鐘。
  • exclude.internal.topics - Kafka 中有兩個內(nèi)部的主題: __consumer_offsets__transaction_state。exclude.internal.topics 用來指定 Kafka 中的內(nèi)部主題是否可以向消費(fèi)者公開,默認(rèn)值為 true。如果設(shè)置為 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式來訂閱內(nèi)部主題,設(shè)置為 false 則沒有這個限制。
  • receive.buffer.bytes - 這個參數(shù)用來設(shè)置 Socket 接收消息緩沖區(qū)(SO_RECBUF)的大小,默認(rèn)值為65536(B),即64KB。如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。如果 Consumer 與 Kafka 處于不同的機(jī)房,則可以適當(dāng)調(diào)大這個參數(shù)值。
  • **send.buffer.bytes** - 這個參數(shù)用來設(shè)置Socket發(fā)送消息緩沖區(qū)(SO_SNDBUF)的大小,默認(rèn)值為131072(B),即128KB。與receive.buffer.bytes參數(shù)一樣,如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值
  • **request.timeout.ms** - 這個參數(shù)用來配置 Consumer 等待請求響應(yīng)的最長時間,默認(rèn)值為30000(ms)。
  • **metadata.max.age.ms** - 這個參數(shù)用來配置元數(shù)據(jù)的過期時間,默認(rèn)值為300000(ms),即5分鐘。如果元數(shù)據(jù)在此參數(shù)所限定的時間范圍內(nèi)沒有進(jìn)行更新,則會被強(qiáng)制更新,即使沒有任何分區(qū)變化或有新的 broker 加入
  • **reconnect.backoff.ms** - 這個參數(shù)用來配置嘗試重新連接指定主機(jī)之前的等待時間(也稱為退避時間),避免頻繁地連接主機(jī),默認(rèn)值為50(ms)。這種機(jī)制適用于消費(fèi)者向 broker 發(fā)送的所有請求。
  • **auto.offset.reset** - 在 Kafka 中,每當(dāng)消費(fèi)者組內(nèi)的消費(fèi)者查找不到所記錄的消費(fèi)位移或發(fā)生位移越界時,就會根據(jù)消費(fèi)者客戶端參數(shù) auto.offset.reset 的配置來決定從何處開始進(jìn)行消費(fèi),這個參數(shù)的默認(rèn)值為 “l(fā)atest” 。
    • **earliest** :當(dāng)各分區(qū)下存在已提交的 offset 時,從提交的 offset 開始消費(fèi);無提交的 offset 時,從頭開始消費(fèi)。
    • **latest** :當(dāng)各分區(qū)下存在已提交的 offset 時,從提交的 offset 開始消費(fèi);無提交的 offset 時,消費(fèi)該分區(qū)下新產(chǎn)生的數(shù)據(jù)。
    • none :topic 各分區(qū)都存在已提交的 offset 時,從 offset 后開始消費(fèi);只要有一個分區(qū)不存在已提交的offset,則拋出異常。
  • interceptor.class - 用來配置消費(fèi)者客戶端的攔截器

--------------------------------------歡迎叨擾此地址---------------------------------------

本文作者:Java技術(shù)債務(wù)
原文鏈接:https://cuizb.top/myblog/article/1677833372
版權(quán)聲明: 本博客所有文章除特別聲明外,均采用 CC BY 3.0 CN協(xié)議進(jìn)行許可。轉(zhuǎn)載請署名作者且注明文章出處。文章來源地址http://www.zghlxwxcb.cn/news/detail-532476.html

到了這里,關(guān)于kafka生產(chǎn)者和消費(fèi)者配置介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka 之生產(chǎn)者與消費(fèi)者基礎(chǔ)知識:基本配置、攔截器、序列化、分區(qū)器

    Kafka 之生產(chǎn)者與消費(fèi)者基礎(chǔ)知識:基本配置、攔截器、序列化、分區(qū)器

    kafaf集群地址列表:理論上寫一個節(jié)點(diǎn)地址,就相當(dāng)于綁定了整個kafka集群了,但是建議多寫幾個,如果只寫一個,萬一宕機(jī)就麻煩了 kafka消息的key和value要指定序列化方法 kafka對應(yīng)的生產(chǎn)者id 使用java代碼表示則為以下代碼: ?可使用?retries 參數(shù) 進(jìn)行設(shè)置,同時要注意記住兩

    2024年02月05日
    瀏覽(30)
  • Kafka系列之:Kafka生產(chǎn)者和消費(fèi)者

    Kafka系列之:Kafka生產(chǎn)者和消費(fèi)者

    batch.size:只有數(shù)據(jù)積累到batch.size之后,sender才會發(fā)送數(shù)據(jù),默認(rèn)16K。 linger.ms:如果數(shù)據(jù)遲遲未達(dá)到batch.size,sender等待linger.ms設(shè)置的時間到了之后就會發(fā)送數(shù)據(jù)。單位ms,默認(rèn)值是0ms,表示沒有延遲。 0:生產(chǎn)者發(fā)送過來的數(shù)據(jù),不需要等數(shù)據(jù)羅盤應(yīng)答。 1:生產(chǎn)者發(fā)送過來的

    2023年04月09日
    瀏覽(25)
  • Java輕松使用Kafka生產(chǎn)者,消費(fèi)者

    Java輕松使用Kafka生產(chǎn)者,消費(fèi)者 一、環(huán)境說明 項目中需要下面的依賴: ( 版本自定義 ) 2. yml配置文件設(shè)置 1. 簡單生產(chǎn)者的書寫: 1. 簡單消費(fèi)者的書寫: ? 注:多消費(fèi)者時,需要對應(yīng)kafka中配置的分區(qū);多少的Partition就有多少個消費(fèi)者,以免資源浪費(fèi)

    2024年02月15日
    瀏覽(29)
  • Kafka生產(chǎn)者與消費(fèi)者api示例

    Kafka生產(chǎn)者與消費(fèi)者api示例

    ? 一個正常的生產(chǎn)邏輯需要具備以下幾個步驟 配置生產(chǎn)者參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實(shí)例 構(gòu)建待發(fā)送的消息 發(fā)送消息 關(guān)閉生產(chǎn)者實(shí)例 采用默認(rèn)分區(qū)方式將消息散列的發(fā)送到各個分區(qū)當(dāng)中 ? ?對于properties配置的第二種寫法,相對來說不會出錯,簡單舉例: ? 1.kafka的生產(chǎn)者可

    2024年02月07日
    瀏覽(24)
  • kafka生產(chǎn)者和消費(fèi)者(python版)

    生產(chǎn)者 消費(fèi)者 消費(fèi)者中的組名主要用戶針對主題的偏移量進(jìn)行更改,也涉及到主題中分區(qū)的問題, kafka工具類 此工具類基本上拿過去就可以用 疑問 當(dāng)消費(fèi)者鏈接kafka時發(fā)現(xiàn)topic沒有未讀的消息怎樣退出呢,默認(rèn)是在一直等待,但是我期望沒有要讀的消息的時候直接退出即可

    2024年02月16日
    瀏覽(21)
  • Kafka官方生產(chǎn)者和消費(fèi)者腳本簡單使用

    怎樣使用Kafka官方生產(chǎn)者和消費(fèi)者腳本進(jìn)行消費(fèi)生產(chǎn)和消費(fèi)?這里假設(shè)已經(jīng)下載了kafka官方文件,并已經(jīng)解壓. 這就可以見到測試kafka對應(yīng)topic了.

    2024年02月04日
    瀏覽(23)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號從0開始計算 Partition:分區(qū)數(shù),該主題有3個分區(qū) Replica:副本數(shù),該主題有3個副本 Leader:副本數(shù)中的主的序號,生產(chǎn)消費(fèi)的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會報錯 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    執(zhí)行topic刪除命令時,出現(xiàn)提示 這條命令其實(shí)并不執(zhí)行刪除動作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關(guān),否則刪除動作是不會執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(29)
  • 探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    目錄 1. 多線程安全 1.1. 生產(chǎn)者是多線程安全的么? 1.1. 消費(fèi)者是多線程安全的么? 2. 消費(fèi)者規(guī)避多線程安全方案 2.1. 每個線程維護(hù)一個kafkaConsumer 2.2. [單/多]kafkaConsumer實(shí)例 + 多worker線程 2.3.方案優(yōu)缺點(diǎn)對比 ????????Kafka生產(chǎn)者是 線程安全 的,可以在多個線程中共享一個

    2023年04月26日
    瀏覽(24)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包