国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn)

這篇具有很好參考價值的文章主要介紹了kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

1、核心概念

消息和批次

Topic和Partition

Replicas

Offset

broker和集群

生產(chǎn)者和消費者

2、開發(fā)實戰(zhàn)

2.1、消息發(fā)送

介紹

代碼實現(xiàn)

2.2、消息消費

介紹

代碼實現(xiàn)

2.3、SpringBoot Kafka

pom

application.yaml

KafkaConfig

producer

consumer


1、核心概念

消息和批次

????????kafka的基本數(shù)據(jù)單元,由字節(jié)數(shù)組組成??梢岳斫獬蓴?shù)據(jù)庫的一條數(shù)據(jù)。

????????批次就是一組消息,把同一個主題和分區(qū)的消息分批次寫入kafka,可以減少網(wǎng)絡(luò)開銷,提高效率;批次越大,單位時間內(nèi)處理的消息就越多,單個消息的傳輸時間就越長。

Topic和Partition

? ? ? ? topic主題,kafka通過主題進行分類。主題可以理解成數(shù)據(jù)庫的表或者文件系統(tǒng)里的文件夾。

? ? ? ? partition分區(qū)可以理解成一個FIFO的消息隊列。(同一個分區(qū)的消息保證順序消費)

????????主題可以被分為若干分區(qū),一個主題通過分區(qū)將消息存儲在kafka集群中,提供橫向擴展的能力。消息以追加的方式寫入分區(qū),每個分區(qū)保證先入先出的順序讀取。在需要嚴格保證消息順序消費的場景下,可以將partition設(shè)置為1,即主題只有一個分區(qū)。

? ? ? ? 主題的分區(qū)策略有如下幾種:

  1. 直接指定分區(qū);
  2. 根據(jù)消息的key散列取模得出分區(qū);
  3. 輪詢指定分區(qū)。

kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn),大數(shù)據(jù),kafka,kafka

Replicas

  1. 副本,每個分區(qū)都有多個副本。其中包含一個首領(lǐng)副本和多個跟隨者副本。
  2. 首領(lǐng)副本用于響應(yīng)生產(chǎn)者的消息寫入請求與消費者的消息讀取請求;
  3. 跟隨者副本用于同步首領(lǐng)副本的數(shù)據(jù),保持與首領(lǐng)副本一致的狀態(tài),有數(shù)據(jù)備份的功能。
  4. 一旦首領(lǐng)副本所在的服務(wù)器宕機,就會從跟隨者中選出一個升級為首領(lǐng)副本。

Offset

? ? ? ? 偏移量。

? ? ? ? 生產(chǎn)者offset:每個分區(qū)都有一個offset,叫做生產(chǎn)者的offset,可以理解為當(dāng)前這個分區(qū)隊列的最大值,下一個消息來的時候,就會將消息寫入到offset這個位置。

? ? ? ? 消費者offset:每個消費者消費分區(qū)中的消息時,會記錄消費的位置(offset),下一次消費時就會從這個位置開始消費。

broker和集群

kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn),大數(shù)據(jù),kafka,kafka

broker為一個獨立的kafka服務(wù)器;一個kafka集群里有多個broker。

? ? ? ? broker接收來自生產(chǎn)者的消息,為消息設(shè)置偏移量,并將消息保存到磁盤。同時,broker為消費者提供服務(wù),對讀取分區(qū)的請求做出響應(yīng),返回已經(jīng)保存到磁盤上的消息。(單個broker可以輕松處理數(shù)千個分區(qū)以及每秒百萬級的消息量)。

? ? ? ? 集群中同一個主題的同一個分區(qū),會在多個broker上存在;其中一個broker上的分區(qū)被稱為首領(lǐng)分區(qū),用于與生產(chǎn)者和消費者交互,其余broker上的分區(qū)叫做副本分區(qū),用于備份分區(qū)數(shù)據(jù),防止broker宕機導(dǎo)致消息丟失。

? ? ? ? 每個集群都有一個broker是集群控制器,作用如下:

  1. 將分區(qū)分配給首領(lǐng)分區(qū)的broker;
  2. 監(jiān)控broker,首領(lǐng)分區(qū)切換

生產(chǎn)者和消費者

kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn),大數(shù)據(jù),kafka,kafka

????????生產(chǎn)者生產(chǎn)消息,消息被發(fā)布到一個特定的主題上。默認情況下,kafka會將消息均勻地分布到主題的所有分區(qū)上。分區(qū)策略有如下幾種:

  1. 直接指定分區(qū);
  2. 根據(jù)消息的key散列取模得出分區(qū);
  3. 輪詢指定分區(qū)。

? ? ? ? 消費者通過偏移量來區(qū)分已經(jīng)讀過的消息,從而消費消息。消費者是消費組的一部分,消費組可以保證每個分區(qū)只能被一個消費者使用,避免重復(fù)消費。

2、開發(fā)實戰(zhàn)

2.1、消息發(fā)送

介紹

  • 生產(chǎn)者主要有KafkaProducer和ProducerRecord兩個對象:KafkaProducer用于發(fā)送消息,ProducerRecord用于封裝kafka消息。
  • 生產(chǎn)者生產(chǎn)消息后,需要broker的確認,可以選擇同步或者異步確認:同步確認效率低;異步確認效率高,但需要設(shè)置回調(diào)對象。? ? ? ??

kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn),大數(shù)據(jù),kafka,kafkakafka學(xué)習(xí)-基本概念與簡單實戰(zhàn),大數(shù)據(jù),kafka,kafka

代碼實現(xiàn)

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
	Map<String, Object> configs = new HashMap<>();
	// 設(shè)置連接Kafka的初始連接?到的服務(wù)器地址
	// 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker
	 configs.put("bootstrap.servers", "node1:9092");
	// 設(shè)置key和value的序列化器
	 configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
	 configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	 configs.put("acks", "1");
	 KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
	 // 用于封裝Producer的消息
	 ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
		 "topic_1", // 主題名稱
		 0, // 分區(qū)編號,現(xiàn)在只有?個分區(qū),所以是0
		 0, // 數(shù)字作為key
		 "message 0" // 字符串作為value
	 );
	 // 發(fā)送消息,同步等待消息的確認
	 // producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
	 
	 // 使用回調(diào)異步等待消息的確認
	 producer.send(record, new Callback() {
		 @Override
		 public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception == null) {
				 System.out.println(
					 "主題:" + metadata.topic() + "\n"
					 + "分區(qū):" + metadata.partition() + "\n"
					 + "偏移量:" + metadata.offset() + "\n"
					 + "序列化的key字節(jié):" + metadata.serializedKeySize() + "\n"
					 + "序列化的value字節(jié):" + metadata.serializedValueSize() + "\n"
					 + "時間戳:" + metadata.timestamp()
				 );
			 } else {
				System.out.println("有異常:" + exception.getMessage());
			 }
		 }
	 });
	 // 關(guān)閉連接
	 producer.close();
}

2.2、消息消費

介紹

????????消費者主要有KafkaConsumer對象,用于消費消息。Kafka不支持消息的推送,我們可以通過消息拉取(poll)方式實現(xiàn)消息的消費。KafkaConsumer主要參數(shù)如下:

kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn),大數(shù)據(jù),kafka,kafka

代碼實現(xiàn)

public static void main(String[] args) {
	Map<String, Object> configs = new HashMap<>();
	// 指定bootstrap.servers屬性作為初始化連接Kafka的服務(wù)器。
	// 如果是集群,則會基于此初始化連接發(fā)現(xiàn)集群中的其他服務(wù)器。
	configs.put("bootstrap.servers", "node1:9092");
	// key和value的反序列化器
	configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
	configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	configs.put("group.id", "consumer.demo");
	// 創(chuàng)建消費者對象
	KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);

	final Pattern pattern = Pattern.compile("topic_[0-9]");
	// 消費者訂閱主題或分區(qū)
	// consumer.subscribe(pattern);
	// consumer.subscribe(pattern, new ConsumerRebalanceListener() {
	final List<String> topics = Arrays.asList("topic_1");
	consumer.subscribe(topics, new ConsumerRebalanceListener() {
		@Override
		public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
			partitions.forEach(tp -> {
				System.out.println("剝奪的分區(qū):" + tp.partition());
			});	
		}
		@Override
		public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
			partitions.forEach(tp -> {
				System.out.println(tp.partition());
			});
		}
	});
	// 拉取訂閱主題的消息
	final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
	// 獲取topic_1主題的消息
	final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
	// 遍歷topic_1主題的消息
	topic1Iterable.forEach(record -> {
		System.out.println("========================================");
		System.out.println("消息頭字段:" + Arrays.toString(record.headers().toArray()));
		System.out.println("消息的key:" + record.key());
		System.out.println("消息的值:" + record.value());
		System.out.println("消息的主題:" + record.topic());
		System.out.println("消息的分區(qū)號:" + record.partition());
		System.out.println("消息的偏移量:" + record.offset());
	});
	// 關(guān)閉消費者
	consumer.close();
}

2.3、SpringBoot Kafka

pom

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
	</dependency>
</dependencies>

application.yaml

spring:
  kafka:
    bootstrap-servers: node1:9092       # 用于建立初始連接的broker地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 16384                 # 默認的批處理記錄數(shù)
      buffer-memory: 33554432           # 32MB的總發(fā)送緩存
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: spring-kafka-02-consumer    # consumer的消費組id
      enable-auto-commit: true              # 是否自動提交消費者偏移量
      auto-commit-interval: 100             # 每隔100ms向broker提交一次偏移量
      auto-offset-reset: earliest           # 如果該消費者的偏移量不存在,則自動設(shè)置為最早的偏移量

KafkaConfig

@Configuration
public class KafkaConfig {
	@Bean
	public NewTopic topic1() {
		return new NewTopic("ntp-01", 5, (short) 1);
	}
	@Bean
	public NewTopic topic2() {
		return new NewTopic("ntp-02", 3, (short) 1);
	}
}

producer

@RestController
public class KafkaSyncProducerController {
	@Autowired
	private KafkaTemplate template;
	
	@RequestMapping("send/sync/{message}")
	public String sendSync(@PathVariable String message) {
		ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));
		try {
			// 同步等待broker的響應(yīng)
			Object o = future.get();
			SendResult<Integer, String> result = (SendResult<Integer, String>) o;
			System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		return "success";
	}
}

@RestController
public class KafkaAsyncProducerController {
	@Autowired
	private KafkaTemplate<Integer, String> template;
	
	@RequestMapping("send/async/{message}")
	public String asyncSend(@PathVariable String message) {
		ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);
		ListenableFuture<SendResult<Integer, String>> future = template.send(record);
		// 添加回調(diào),異步等待響應(yīng)
		future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){
			@Override
			public void onFailure(Throwable throwable) {
				System.out.println("發(fā)送失敗: " + throwable.getMessage());
			}
			
			@Override
			public void onSuccess(SendResult<Integer, String> result) {
				System.out.println("發(fā)送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());
			}
		});
		return "success";
	}
}

consumer

@Component
public class MyConsumer {

	@KafkaListener(topics = "topic-spring-02")
	public void onMessage(ConsumerRecord<Integer, String> record) {
		Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
		if (optional.isPresent()) {
			System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
		}
	}
}

以上內(nèi)容為個人學(xué)習(xí)理解,如有問題,歡迎在評論區(qū)指出。

部分內(nèi)容截取自網(wǎng)絡(luò),如有侵權(quán),聯(lián)系作者刪除。文章來源地址http://www.zghlxwxcb.cn/news/detail-704385.html

到了這里,關(guān)于kafka學(xué)習(xí)-基本概念與簡單實戰(zhàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 學(xué)習(xí)Vue:數(shù)據(jù)綁定的基本概念

    在 Vue.js 中,Vue 實例是您構(gòu)建應(yīng)用程序的核心。它允許您將數(shù)據(jù)和界面連接起來,實現(xiàn)動態(tài)的數(shù)據(jù)綁定,使您的應(yīng)用程序能夠根據(jù)數(shù)據(jù)的變化自動更新界面。讓我們來深入了解 Vue 實例與數(shù)據(jù)綁定的基本概念。 什么是 Vue 實例? Vue 實例是 Vue.js 應(yīng)用程序的基本構(gòu)建塊。它是一

    2024年02月13日
    瀏覽(23)
  • kafka--kafka的基本概念-副本概念replica

    kafka--kafka的基本概念-副本概念replica

    Broker 表示實際的物理機器節(jié)點 Broker1中的綠色P1表示主分片Broker2中的藍色P1表示副本分片,其余類似,就是主從的概念,如果一個Broker掛掉了,還有其它的節(jié)點來保證數(shù)據(jù)的完整性 P可以看做分區(qū) 同一時間點,綠色P1 和紫色P1 不會完全一致,存在一個同步的過程 綠色部分處理

    2024年02月12日
    瀏覽(24)
  • 【Kafka】基本概念

    【Kafka】基本概念

    這個流派通常有?臺服務(wù)器作為 Broker,所有的消息都通過它中轉(zhuǎn)。?產(chǎn)者把消息發(fā)送給它就結(jié)束??的任務(wù)了,Broker 則把消息主動推送給消費者(或者消費者主動輪詢) 1.1.1 重topic 生產(chǎn)者生產(chǎn)的消息有topic,消費者訂閱topic,在重topic的消息隊列?必然需要topic的存在 1.1.2 輕

    2024年02月05日
    瀏覽(28)
  • Kafka入門基本概念(詳細)

    Kafka入門基本概念(詳細)

    Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)(消息引擎系統(tǒng)),它可以處理消費者在網(wǎng)站中的所有動作流數(shù)據(jù)。 這種動作(網(wǎng)頁瀏覽,搜索和其他用戶的行動)是在現(xiàn)代網(wǎng)絡(luò)上的許多社會功能的一個關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合

    2024年01月16日
    瀏覽(21)
  • Kafka基本概念

    Kafka基本概念

    Kafka是最初由Linkedin公司開發(fā),是一個分布式、分區(qū)的、多副本的、多生產(chǎn)者、多訂閱者,基于 zookeeper協(xié)調(diào)的分布式日志系統(tǒng)(也可以當(dāng)做MQ系統(tǒng)),常見可以用于web/nginx日志、訪問日志,消息服務(wù)等等。 主要應(yīng)用場景是:日志收集系統(tǒng)和消息系統(tǒng)。 Kafka主要設(shè)計目標(biāo)如下:

    2024年02月13日
    瀏覽(18)
  • Kafka - 深入了解Kafka基礎(chǔ)架構(gòu):Kafka的基本概念

    Kafka - 深入了解Kafka基礎(chǔ)架構(gòu):Kafka的基本概念

    我們首先了解一些Kafka的基本概念。 1)Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端 2)Consumer :消息消費者,向kafka broker獲取消息的客戶端 3)Consumer Group (CG):消費者組,由多個consumer組成。消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù),一個broker可以有多個

    2024年02月08日
    瀏覽(20)
  • kafka--技術(shù)文檔-基本概念-《快速了解kafka》

    kafka--技術(shù)文檔-基本概念-《快速了解kafka》

    學(xué)習(xí)一種新的消息中間鍵,卡夫卡?。?! 官網(wǎng)網(wǎng)址 Apache Kafka ????????Kafka是一種開源的分布式流處理平臺,由Apache軟件基金會開發(fā),用Scala和Java編寫。它是一個高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),可以處理消費者在網(wǎng)站中的所有動作流數(shù)據(jù)。這種動作可以是網(wǎng)頁瀏覽、

    2024年02月11日
    瀏覽(17)
  • Kafka的基本概念和架構(gòu)

    Kafka的基本概念和架構(gòu)

    1.1 定義 Kafka是一個開源的分布式事件流平臺 ( Event Streaming Platform ),被廣泛用于高性能數(shù)據(jù)管道、流分 析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。 1.2 各消息隊列的比較 目前比 較常見的消息隊列產(chǎn)品主要有 Kafka 、 RabbitMQ 、 RocketMQ 等。 在大數(shù)據(jù)場景主要采用 Kafka 作為消息隊列。在

    2024年02月01日
    瀏覽(34)
  • kafka--kafka的基本概念-topic和partition

    kafka--kafka的基本概念-topic和partition

    topic是邏輯概念 以Topic機制來對消息進行分類的,同一類消息屬于同一個Topic,你可以將每個topic看成是一個消息隊列。 生產(chǎn)者(producer)將消息發(fā)送到相應(yīng)的Topic,而消費者(consumer)通過從Topic拉取消息來消費 kafka中是要求消費者主動拉取消息消費的,它并不會主動推送消息

    2024年02月12日
    瀏覽(26)
  • 軟件設(shè)計師學(xué)習(xí)筆記12-數(shù)據(jù)庫的基本概念+數(shù)據(jù)庫的設(shè)計過程+概念設(shè)計+邏輯設(shè)計

    軟件設(shè)計師學(xué)習(xí)筆記12-數(shù)據(jù)庫的基本概念+數(shù)據(jù)庫的設(shè)計過程+概念設(shè)計+邏輯設(shè)計

    目錄 1.數(shù)據(jù)庫的基本概念 1.1數(shù)據(jù)庫的體系結(jié)構(gòu) 1.1.1常見數(shù)據(jù)庫 1.1.2分布式數(shù)據(jù)庫的特點 1.1.3分布式數(shù)據(jù)庫的透明性 1.1.4例題 1.2三級模式結(jié)構(gòu) 1.2.1三級模式概念圖 1.2.2例題 1.3數(shù)據(jù)倉庫 1.3.1數(shù)據(jù)倉庫的特點 1.3.2數(shù)據(jù)倉庫的過程 1.3.3例題 2.數(shù)據(jù)庫的設(shè)計過程 2.1設(shè)計過程概念圖 2

    2024年02月07日
    瀏覽(19)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包