国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【消息中間件MQ系列】Spring整合kafka并設(shè)置多套kafka配置

這篇具有很好參考價(jià)值的文章主要介紹了【消息中間件MQ系列】Spring整合kafka并設(shè)置多套kafka配置。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

?1、前言

? ? ? ? 圣誕節(jié)的到來,程序員不會(huì)收到圣誕老人的??,但可以自己滿足一下自己,所以,趁著有時(shí)間,就記錄一下這會(huì)兒擼了些什么代碼吧?。?!

????????因?yàn)闃I(yè)務(wù)原因,需要在系統(tǒng)內(nèi)新增其他的kakfa配置使用,所以今天研究的是怎么在系統(tǒng)內(nèi)整合多套kafka配置使用。


2、整合kafka實(shí)踐

首先,引入pom依賴,我的版本是 2.3.1?

<dependency>
? ? <groupId>org.springframework.kafka</groupId>
? ? <artifactId>spring-kafka</artifactId>
</dependency>

然后,設(shè)置properties配置:

spring.kafka.bootstrap-servers=127.0.0.1:9090,127.0.0.1:9091,127.0.0.1:9092
spring.kafka2.bootstrap-servers=127.0.0.2:9090,127.0.0.2:9091,127.0.0.2:9092

?另外還可以設(shè)置很多其他配置,例如生產(chǎn)者、消費(fèi)者各自的消息序列化、消費(fèi)模式等等,

具體的配置樣例,請(qǐng)參考如下:

#################consumer的配置參數(shù)(開始)#################
#如果'enable.auto.commit'為true,則消費(fèi)者偏移自動(dòng)提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。
spring.kafka.consumer.auto-commit-interval;
 
#當(dāng)Kafka中沒有初始偏移量或者服務(wù)器上不再存在當(dāng)前偏移量時(shí)該怎么辦,默認(rèn)值為latest,表示自動(dòng)將偏移重置為最新的偏移量
# latest:不存在offset時(shí),消費(fèi)最新的消息
# earliest:不存在offset時(shí),從最早消息開始消費(fèi)
# none :不存在offset時(shí),直接報(bào)錯(cuò)
spring.kafka.consumer.auto-offset-reset=latest;
 
#以逗號(hào)分隔的主機(jī):端口對(duì)列表,用于建立與Kafka群集的初始連接。
spring.kafka.consumer.bootstrap-servers;
 
#ID在發(fā)出請(qǐng)求時(shí)傳遞給服務(wù)器;用于服務(wù)器端日志記錄。
spring.kafka.consumer.client-id;
 
#如果為true,則消費(fèi)者的偏移量將在后臺(tái)定期提交,默認(rèn)值為true
spring.kafka.consumer.enable-auto-commit=true;
 
#如果沒有足夠的數(shù)據(jù)立即滿足“fetch.min.bytes”給出的要求,服務(wù)器在回答獲取請(qǐng)求之前將阻塞的最長(zhǎng)時(shí)間(以毫秒為單位)
#默認(rèn)值為500
spring.kafka.consumer.fetch-max-wait;
 
#服務(wù)器應(yīng)以字節(jié)為單位返回獲取請(qǐng)求的最小數(shù)據(jù)量,默認(rèn)值為1,對(duì)應(yīng)的kafka的參數(shù)為fetch.min.bytes。
spring.kafka.consumer.fetch-min-size;
 
#用于標(biāo)識(shí)此使用者所屬的使用者組的唯一字符串。
spring.kafka.consumer.group-id;
 
#心跳與消費(fèi)者協(xié)調(diào)員之間的預(yù)期時(shí)間(以毫秒為單位),默認(rèn)值為3000
spring.kafka.consumer.heartbeat-interval;
 
#密鑰的反序列化器類,實(shí)現(xiàn)類實(shí)現(xiàn)了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
#值的反序列化器類,實(shí)現(xiàn)類實(shí)現(xiàn)了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
#一次調(diào)用poll()操作時(shí)返回的最大記錄數(shù),默認(rèn)值為500
spring.kafka.consumer.max-poll-records;
#################consumer的配置參數(shù)(結(jié)束)#################
#################producer的配置參數(shù)(開始)#################
#procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:
#acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。
#acks = 1 這意味著leader會(huì)將記錄寫入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。
#可以設(shè)置的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
 
#每當(dāng)多個(gè)記錄被發(fā)送到同一分區(qū)時(shí),生產(chǎn)者將嘗試將記錄一起批量處理為更少的請(qǐng)求, 
#這有助于提升客戶端和服務(wù)器上的性能,此配置控制默認(rèn)批量大?。ㄒ宰止?jié)為單位),默認(rèn)值為16384
spring.kafka.producer.batch-size=16384
 
#以逗號(hào)分隔的主機(jī):端口對(duì)列表,用于建立與Kafka群集的初始連接
spring.kafka.producer.bootstrap-servers
 
#生產(chǎn)者可用于緩沖等待發(fā)送到服務(wù)器的記錄的內(nèi)存總字節(jié)數(shù),默認(rèn)值為33554432
spring.kafka.producer.buffer-memory=33554432
 
#ID在發(fā)出請(qǐng)求時(shí)傳遞給服務(wù)器,用于服務(wù)器端日志記錄
spring.kafka.producer.client-id
 
#生產(chǎn)者生成的所有數(shù)據(jù)的壓縮類型,此配置接受標(biāo)準(zhǔn)壓縮編解碼器('gzip','snappy','lz4'),
#它還接受'uncompressed'以及'producer',分別表示沒有壓縮以及保留生產(chǎn)者設(shè)置的原始?jí)嚎s編解碼器,
#默認(rèn)值為producer
spring.kafka.producer.compression-type=producer
 
#key的Serializer類,實(shí)現(xiàn)類實(shí)現(xiàn)了接口org.apache.kafka.common.serialization.Serializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
 
#值的Serializer類,實(shí)現(xiàn)類實(shí)現(xiàn)了接口org.apache.kafka.common.serialization.Serializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
#如果該值大于零時(shí),表示啟用重試失敗的發(fā)送次數(shù)
spring.kafka.producer.retries
#################producer的配置參數(shù)(結(jié)束)#################
#################listener的配置參數(shù)(結(jié)束)#################
#偵聽器的AckMode,參見https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets
#當(dāng)enable.auto.commit的值設(shè)置為false時(shí),該值會(huì)生效;為true時(shí)不會(huì)生效
spring.kafka.listener.ack-mode;
 
#在偵聽器容器中運(yùn)行的線程數(shù)
spring.kafka.listener.concurrency;
 
#輪詢消費(fèi)者時(shí)使用的超時(shí)(以毫秒為單位)
spring.kafka.listener.poll-timeout;
 
#當(dāng)ackMode為“COUNT”或“COUNT_TIME”時(shí),偏移提交之間的記錄數(shù)
spring.kafka.listener.ack-count;
 
#當(dāng)ackMode為“TIME”或“COUNT_TIME”時(shí),偏移提交之間的時(shí)間(以毫秒為單位)
spring.kafka.listener.ack-time;
#################listener的配置參數(shù)(結(jié)束)#################

最后,編寫kafka的config配置類:

package ***.***.***.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfiuration {

	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServer;
	
	@Value("${spring.kafka2.bootstrap-servers}")
	private String bootstrapServer2;

	@Bean("kafkaTemplateOne")
	public KafkaTemplate<String, String> oneReqKafkaTemplate() {
		return new KafkaTemplate<>(oneEnquiryReqFactory());
	}

	@Bean
	public ProducerFactory<String, String> oneEnquiryReqFactory() {
		return new DefaultKafkaProducerFactory<>(oneProducerConfigs());
	}

	@Bean
	public Map<String, Object> oneProducerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 		return props;
	}

	@Bean("kafkaTemplateTwo")
	public KafkaTemplate<String, String> twoKafkaTemplate() {
		return new KafkaTemplate<>(twoFactory());
	}

	@Bean
	public ProducerFactory<String, String> twoFactory() {
		return new DefaultKafkaProducerFactory<>(twoProducerConfigs());
	}

	@Bean
	public Map<String, Object> twoProducerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer2);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return props;
	}
}

配置完成之后,就可以直接使用啦:

package ***.***.***.controller;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class KafkaTestController {
	
	@Resource(name = "kafkaTemplateOne")
	private KafkaTemplate<String, String> kafkaTemplateOne;
	
	@Resource(name = "kafkaTemplateTwo")
	private KafkaTemplate<String, String> kafkaTemplateTwo;
	
	private final static String TOPIC1 = "testTopic1";
	private final static String TOPIC2 = "testTopic2";
	
	@PostMapping("/testKafka")
	public void testKafka(){
		kafkaTemplateOne.send(TOPIC1,"test message 1");
		kafkaTemplateTwo.send(TOPIC2,"test message 2");
	}
	
	@KafkaListener(groupId = "TOPIC_DATA_ACCOUNT_GROUPID",topics = TOPIC1)
	public void receiveMsg(String msg){
		System.out.println(TOPIC1+" 接收到的消息是:"+msg);
	}
	
	@KafkaListener(groupId = "TOPIC_DATA_ACCOUNT_GROUPID",topics = TOPIC2)
	public void receiveMsg1(String msg){
		System.out.println(TOPIC2+" 接收到的消息是:"+msg);
	}
}

效果如下:

【消息中間件MQ系列】Spring整合kafka并設(shè)置多套kafka配置

2022-11-15 18:35:28,973 [kafka-producer-network-thread | producer-2] [] INFO (Metadata.java:261)- [Producer clientId=producer-2] Cluster ID: SrufjpSTQpiu0QseCKbwYg
testTopic2 接收到的消息是:test message 2
testTopic1 接收到的消息是:test message 1


3、總結(jié)

????????本次的記錄內(nèi)容,只是簡(jiǎn)單的demo實(shí)踐,具體的使用情況,可以根據(jù)自身系統(tǒng)設(shè)置詳細(xì)配置處理。
????????若有疑問,歡迎留言討論~~。

????????最后祝大家圣誕節(jié)快樂,新年快到了,繼續(xù)加油吧?。?!

【消息中間件MQ系列】Spring整合kafka并設(shè)置多套kafka配置文章來源地址http://www.zghlxwxcb.cn/news/detail-427534.html

到了這里,關(guān)于【消息中間件MQ系列】Spring整合kafka并設(shè)置多套kafka配置的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【消息中間件】詳解mq消息積壓

    【消息中間件】詳解mq消息積壓

    作者簡(jiǎn)介 目錄 1.產(chǎn)生原因 2.解決辦法 2.1.事前處理機(jī)制 2.2.事中處理機(jī)制 2.3.事后處理機(jī)制 消息積壓(Message Backlog)指的是在消息隊(duì)列(MQ)系統(tǒng)中等待被處理的消息數(shù)量超過了正常的處理速度,導(dǎo)致消息在隊(duì)列中積壓堆積的情況。 消息積壓的常見表現(xiàn): 系統(tǒng)資源使用率上升

    2024年02月07日
    瀏覽(31)
  • 實(shí)戰(zhàn):Spring Cloud Stream集成兼容多消息中間件kafka、rabbitmq

    實(shí)戰(zhàn):Spring Cloud Stream集成兼容多消息中間件kafka、rabbitmq

    前面的博文我們介紹并實(shí)戰(zhàn)演示了Spring Cloud Stream整合rabbitmq,其中主要介紹了如何使用和配置完成消息中間件的集成。但是,在實(shí)際的生產(chǎn)環(huán)境中可能會(huì)用到多個(gè)消息中間件,又或者是由于業(yè)務(wù)改變需要更換消息中間件,在這些情況下我們的Spring Cloud Stream框架可以完全兼容多

    2024年02月08日
    瀏覽(25)
  • 快速掌握MQ消息中間件rabbitmq

    Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive. happy for hardess to solve denpendies. 需求: 1.video A https://www.bilibili.com/video/BV1cb4y1o7zz?p=12vd_source=533ee415c42b820b0f4105acb4932a02 參考資料 官方文檔 開源社區(qū) 博客文

    2024年02月11日
    瀏覽(28)
  • MQ(消息中間件)概述及 RabbitMQ 的基本介紹

    MQ(消息中間件)概述及 RabbitMQ 的基本介紹

    消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決 應(yīng)用解耦,異步消息,流量削鋒等 問題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。流量削鋒 : 削減峰值壓力(秒殺,搶購(gòu)) MQ(Message Queue,消息隊(duì)列)是典型的生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊(duì)列中

    2024年02月12日
    瀏覽(24)
  • 【中間件】消息中間件之Kafka

    一、概念介紹 Apache Kafka是一個(gè)分布式流處理平臺(tái),用于構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和流應(yīng)用。它可以處理網(wǎng)站、應(yīng)用或其他來源產(chǎn)生的大量數(shù)據(jù)流,并能實(shí)時(shí)地將這些數(shù)據(jù)流傳輸?shù)搅硪粋€(gè)系統(tǒng)或應(yīng)用中進(jìn)行處理。 核心概念: Topic(主題) :消息的分類,用于區(qū)分不同的業(yè)務(wù)消息。

    2024年01月20日
    瀏覽(43)
  • Springboot整合RabbitMQ消息中間件

    spring-boot-rabbitmq–消息中間件整合 前言:RabbitMQ的各種交換機(jī)說明 1、直連交換機(jī) 生產(chǎn)者發(fā)布消息時(shí)必須帶著routing-key,隊(duì)列綁定到交換機(jī)時(shí)必須指定binding-key ,且routing-key和binding-key必須完全相同,如此才能將消息路由到隊(duì)列中 直連交換機(jī)通常用來循環(huán)分發(fā)任務(wù)給多個(gè)workers,

    2024年02月11日
    瀏覽(33)
  • 消息中間件(二)——kafka

    消息中間件(二)——kafka

    在大數(shù)據(jù)中,會(huì)使用到大量的數(shù)據(jù)。面對(duì)這些海量的數(shù)據(jù),我們一是需要做到能夠 收集 這些數(shù)據(jù),其次是要能夠 分析和處理 這些海量數(shù)據(jù)。在此過程中,需要一套消息系統(tǒng)。 Kafka專門為分 布式高吞吐量 系統(tǒng)設(shè)計(jì)。作為一個(gè)消息代理的替代品,Kafka往往做的比其他消息中間

    2024年02月07日
    瀏覽(29)
  • 消息中間件 —— 初識(shí)Kafka

    消息中間件 —— 初識(shí)Kafka

    1.1.1、為什么要有消息隊(duì)列? 1.1.2、消息隊(duì)列 消息 Message 網(wǎng)絡(luò)中的兩臺(tái)計(jì)算機(jī)或者兩個(gè)通訊設(shè)備之間傳遞的數(shù)據(jù)。例如說:文本、音樂、視頻等內(nèi)容。 隊(duì)列 Queue 一種特殊的線性表(數(shù)據(jù)元素首尾相接),特殊之處在于只允許在首部刪除元素和在尾部追加元素(FIFO)。 入隊(duì)、出

    2024年02月13日
    瀏覽(25)
  • 消息中間件之Kafka(一)

    消息中間件之Kafka(一)

    高性能的消息中間件,在大數(shù)據(jù)的業(yè)務(wù)場(chǎng)景下性能比較好,kafka本身不維護(hù)消息位點(diǎn),而是交由Consumer來維護(hù),消息可以重復(fù)消費(fèi),并且內(nèi)部使用了零拷貝技術(shù),性能比較好 Broker持久化消息時(shí)采用了MMAP的技術(shù),Consumer拉取消息時(shí)使用的sendfile技術(shù) Kafka是最初由Linkedin公司開發(fā),

    2024年01月20日
    瀏覽(53)
  • 消息中間件之Kafka(二)

    消息中間件之Kafka(二)

    1.1 為什么要對(duì)topic下數(shù)據(jù)進(jìn)行分區(qū)存儲(chǔ)? 1.commit log文件會(huì)受到所在機(jī)器的文件系統(tǒng)大小的限制,分區(qū)之后可以將不同的分區(qū)放在不同的機(jī)器上, 相當(dāng)于對(duì)數(shù)據(jù)做了分布式存儲(chǔ),理論上一個(gè)topic可以處理任意數(shù)量的數(shù)據(jù) 2.提高并行度 1.2 如何在多個(gè)partition中保證順序消費(fèi)? 方案一

    2024年01月21日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包