国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

java阻塞隊(duì)列/kafka/spring整合kafka

這篇具有很好參考價值的文章主要介紹了java阻塞隊(duì)列/kafka/spring整合kafka。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

queue增加刪除元素

  • 增加元素
    • add方法在添加元素的時候,若超出了度列的長度會直接拋出異常:
    • put方法,若向隊(duì)尾添加元素的時候發(fā)現(xiàn)隊(duì)列已經(jīng)滿了會發(fā)生阻塞一直等待空間,以加入元素
    • offer方法在添加元素時,如果發(fā)現(xiàn)隊(duì)列已滿無法添加的話,會直接返回false
  • 刪除元素
    • poll: 若隊(duì)列為空,返回null。
    • remove:若隊(duì)列為空,拋出NoSuchElementException異常。
    • take:若隊(duì)列為空,發(fā)生阻塞,等待有元素

BlockingQueue:

  • 解決線程通信的問題
  • 阻塞方法:put、take

其他實(shí)現(xiàn)類:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue/ SynchronousQueue/ DelayQueue

BlockingQueue實(shí)例

package com.nowcoder.mycommunity;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable{

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for(int i = 0; i < 100; ++ i){
                queue.put(i);
                Thread.sleep(20);
                System.out.println(Thread.currentThread().getName() + "   producer" + queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable{

    public BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                Thread.sleep(new Random().nextInt(1000));
                System.out.println(Thread.currentThread().getName() + "   consuer" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

kafka

  • kafka是一個分布式的流媒體平臺
  • 主要應(yīng)用:消息系統(tǒng)、日志收集、用戶行為追蹤、流式處理
  • 特點(diǎn):高吞吐量、消息持久化(存放在磁盤上,btw,磁盤順序讀寫速度并不慢)、高可靠性、高擴(kuò)展性
Broker

kafka的服務(wù)器,每一臺服務(wù)器稱為一個Broker

Zookeeper

管理其他集群,包括kafka的集群??梢詥为?dú)下載

Topic/ Partition/ Offset

消息隊(duì)列可能是一對多的形式,生產(chǎn)者將一條消息放在多個隊(duì)列中,然后消費(fèi)者從各自的隊(duì)列中取消息。
下圖為一個Topic,Topic中可能會含有很多Partition,Offset為Partition的索引
java阻塞隊(duì)列/kafka/spring整合kafka,java雜文,java,kafka,開發(fā)語言

Leader Replica/ Follower Replica

kafka的數(shù)據(jù)不止存儲一份,他會存為多份,即使某一個分區(qū)壞了還可以有備份。
leader Replica(祖副本):當(dāng)嘗試從分區(qū)獲取數(shù)據(jù)時,祖副本可以處理請求,返回?cái)?shù)據(jù)
Follower Replica(隨從副本):只能備份,不能響應(yīng)請求
如果祖副本掛掉,集群會從Follower Replica中選一個作為新的leader

kafka命令

官方文檔

配置

進(jìn)入到configure目錄下,修改consumer.properties

使用

進(jìn)入到kafka的目錄中

// 啟動zookeeper
> ./bin/zookeeper-server-start.sh config/zookeeper.properties 

// 啟動kafka
> ./bin/kafka-server-start.sh config/server.properties 

// --create:創(chuàng)建主題
// --bootstrap-server localhost:9092:在哪個服務(wù)器創(chuàng)建主題,kafka默認(rèn)端口為9092
// --replication-factor 1:副本為1
// --partitions 1:分區(qū)為1
// --topic test:主題的名字
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

// 查看該服務(wù)器上的主題
> ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092            
test

// 創(chuàng)建生產(chǎn)者向某個服務(wù)器的某個主題中發(fā)消息
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test        
>hello
>world

// 創(chuàng)建一個消費(fèi)者,讀取某個服務(wù)器上某個主題下的消息隊(duì)列,從頭開始讀取
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

Spring整合Kafka

引入依賴

pom.xml文章來源地址http://www.zghlxwxcb.cn/news/detail-524053.html

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.7</version>
</dependency>
配置Kafka
  • 配置server
  • 配置consumer
# Kafka Properties
# 服務(wù)器地址
spring.kafka.bootstrap-servers==localhost:9092
#消費(fèi)者id,可以在consumer.properties查看
spring.kafka.consumer.group.id=mycommunity-consumer-group
# 是否自動提交
spring.kafka.consumer.enable-auto-commit=true
# 自動提交的時間間隔,單位毫秒
spring.kafka.consumer.auto-commit-interval=3000
訪問Kafka
  • producer
  • consumer

Spring整合Kafka的例子

package com.nowcoder.mycommunity;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = MyCommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test", "hello");
        kafkaProducer.sendMessage("test", "world");

        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Component
class KafkaProducer{

    @Autowired
    public KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content){
        kafkaTemplate.send(topic, content);
    }
}

@Component
class KafkaConsumer{

	// 加上listener注解,Spring會自動注入
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }
}

到了這里,關(guān)于java阻塞隊(duì)列/kafka/spring整合kafka的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 非阻塞重試與 Spring Kafka 的集成測試

    ????????如何為啟用重試和死信發(fā)布的消費(fèi)者的 Spring Kafka 實(shí)現(xiàn)編寫集成測試。 Kafka 中的非阻塞重試是通過為主主題配置重試主題來完成的。如果需要,還可以配置其他死信主題。如果所有重試均已用盡,事件將轉(zhuǎn)發(fā)至 DLT。公共領(lǐng)域提供了大量資源來了解技術(shù)細(xì)節(jié)。? 在

    2024年02月12日
    瀏覽(12)
  • 充分了解java阻塞隊(duì)列機(jī)制

    充分了解java阻塞隊(duì)列機(jī)制

    BlockingQueue繼承了Queue的接口,是隊(duì)列的一種,并且和Queue相比,BlockingQueue是線程安全的,多用于并發(fā)+并行編程,對于線程安全問題可以很好的解決. 下面是實(shí)現(xiàn)BlockingQueue接口的類 怕大家理解不方便,俺通過思維導(dǎo)圖的方式給大家呈現(xiàn) 阻塞隊(duì)列的典型例子就是BlockingQueue接口的實(shí)現(xiàn)類

    2024年02月15日
    瀏覽(23)
  • kafka延時隊(duì)列原理,Java開發(fā)中遇到最難的問題

    kafka延時隊(duì)列原理,Java開發(fā)中遇到最難的問題

    Dubbo中zookeeper做注冊中心,如果注冊中心集群都掛掉,發(fā)布者和訂閱者之間還能通信么? Dubbo 的整體架構(gòu)設(shè)計(jì)有哪些分層? 什么是 Spring Boot?以及Spring Boot的優(yōu)劣勢? 你如何理解 Spring Boot 中的 Starters? 服務(wù)注冊和發(fā)現(xiàn)是什么意思?Spring Cloud 如何實(shí)現(xiàn)? Spring Cloud斷路器的作用

    2024年03月21日
    瀏覽(26)
  • 使用 Spring Kafka 進(jìn)行非阻塞重試的集成測試

    ?Kafka的非阻塞重試是通過為主題配置重試主題來實(shí)現(xiàn)的。如果需要,還可以配置額外的死信主題。如果所有重試都耗盡,事件將被轉(zhuǎn)發(fā)到DLT。在公共領(lǐng)域中有很多資源可用于了解技術(shù)細(xì)節(jié)。對于代碼中的重試機(jī)制編寫集成測試確實(shí)是一項(xiàng)具有挑戰(zhàn)性的工作。以下是一些測試

    2024年02月10日
    瀏覽(20)
  • 深入淺出Java多線程(十三):阻塞隊(duì)列

    大家好,我是你們的老伙計(jì)秀才!今天帶來的是[深入淺出Java多線程]系列的第十三篇內(nèi)容:阻塞隊(duì)列。大家覺得有用請點(diǎn)贊,喜歡請關(guān)注!秀才在此謝過大家了?。?! 在多線程編程的世界里,生產(chǎn)者-消費(fèi)者問題是一個經(jīng)典且頻繁出現(xiàn)的場景。設(shè)想這樣一個情況:有一群持續(xù)

    2024年03月20日
    瀏覽(28)
  • 【Java】多線程案例(單例模式,阻塞隊(duì)列,定時器,線程池)

    【Java】多線程案例(單例模式,阻塞隊(duì)列,定時器,線程池)

    ?? Author: 老九 ?? 個人博客:老九的CSDN博客 ?? 個人名言:不可控之事 樂觀面對 ?? 系列專欄: 單例模式是設(shè)計(jì)模式之一。代碼當(dāng)中的某個類,只能有一個實(shí)例,不能有多個。單例模式分為:餓漢模式和懶漢模式 餓漢模式表示很著急,就想吃完飯剩下很多碗,然后一

    2024年02月06日
    瀏覽(51)
  • java高并發(fā)系列 - 第25天:掌握J(rèn)UC中的阻塞隊(duì)列

    這是java高并發(fā)系列第25篇文章。 環(huán)境:jdk1.8。 本文內(nèi)容 掌握Queue、BlockingQueue接口中常用的方法 介紹6中阻塞隊(duì)列,及相關(guān)場景示例 重點(diǎn)掌握4種常用的阻塞隊(duì)列 Queue接口 隊(duì)列是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),java中用Queue接口來表示隊(duì)列。 Queue接口中定義了6個方法:

    2024年02月14日
    瀏覽(58)
  • SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列

    SpringBoot整合SpringCloudStream3.1+版本的Kafka死信隊(duì)列

    SpringBoot整合SpringCloudStream3.1+版本Kafka 添加死信隊(duì)列配置文件,添加對應(yīng)channel 通道綁定配置對應(yīng)的channel位置添加重試配置 Kafka基本配置(application-mq.yml) 創(chuàng)建死信隊(duì)列配置文件(application-dql.yml) 注意:這里的valueSerde使用了對象類型,需要搭配 application/json 使用,consumer接收

    2024年02月16日
    瀏覽(20)
  • Java 多線程系列Ⅳ(單例模式+阻塞式隊(duì)列+定時器+線程池)

    Java 多線程系列Ⅳ(單例模式+阻塞式隊(duì)列+定時器+線程池)

    設(shè)計(jì)模式就是軟件開發(fā)中的“棋譜”,軟件開發(fā)中也有很多常見的 “問題場景”。針對這些問題場景,大佬們總結(jié)出了一些固定的套路。按照這些套路來實(shí)現(xiàn)代碼可能不會很好,但至少不會很差。當(dāng)前階段我們需要掌握兩種設(shè)計(jì)模式: (1)單例模式 (2)工廠模式 概念/特征

    2024年02月09日
    瀏覽(25)
  • 【Java系列】多線程案例學(xué)習(xí)——基于阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

    【Java系列】多線程案例學(xué)習(xí)——基于阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

    個人主頁:兜里有顆棉花糖 歡迎 點(diǎn)贊?? 收藏? 留言? 加關(guān)注??本文由 兜里有顆棉花糖 原創(chuàng) 收錄于專欄【Java系列專欄】【JaveEE學(xué)習(xí)專欄】 本專欄旨在分享學(xué)習(xí)JavaEE的一點(diǎn)學(xué)習(xí)心得,歡迎大家在評論區(qū)交流討論?? 什么是阻塞式隊(duì)列(有兩點(diǎn)): 第一點(diǎn):當(dāng)隊(duì)列滿的時候

    2024年02月04日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包