目錄
1、生產(chǎn)者-消費者模型是什么
2、Java中的實現(xiàn)
3、應(yīng)用于消息隊列
3.1 引入依賴
3.2?rabbitmq網(wǎng)站新建隊列queue
3.3 模塊中配置application.yml
3.4 生產(chǎn)者實現(xiàn)類
3.5 單元測試,發(fā)送msg到rabbitmq的隊列(my_simple_queue)
3.6 消費者實現(xiàn)類
3.7?從rabbitmq隊列(my_simple_queue)消費數(shù)據(jù)
3.8 隊列的配置類
小結(jié)
本文是RabbitMQ初入門-CSDN博客的進一步拓展,著重介紹該模型在消息隊列(如rabbitmq)中的應(yīng)用。
1、生產(chǎn)者-消費者模型是什么
首先,生產(chǎn)者-消費者模型是一種常見的并發(fā)編程模型,用于解決多線程或多進程環(huán)境下的數(shù)據(jù)共享與同步問題。在這個模型中,生產(chǎn)者負責生成數(shù)據(jù),并將數(shù)據(jù)放入一個共享的緩沖區(qū)中,而消費者則從緩沖區(qū)中取出數(shù)據(jù)進行處理。
圖片來源:Java多線程之生產(chǎn)者消費者模式詳解_java_腳本之家
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
這個阻塞隊列就是用來給生產(chǎn)者和消費者解耦的。縱觀大多數(shù)設(shè)計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學(xué)習(xí)一些設(shè)計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設(shè)計模式。(引自鏈接:Java多線程之生產(chǎn)者和消費者模型 - 簡書)
生產(chǎn)者-消費者模型通常包含以下幾個關(guān)鍵元素:
-
生產(chǎn)者:負責生成數(shù)據(jù)并放入緩沖區(qū)。生產(chǎn)者不斷地生成數(shù)據(jù),直到達到某個條件才停止。一般情況下,生產(chǎn)者在向緩沖區(qū)放入數(shù)據(jù)之前需要先檢查緩沖區(qū)是否已滿,如果已滿則等待。
-
消費者:負責從緩沖區(qū)中取出數(shù)據(jù)并進行處理。消費者不斷地從緩沖區(qū)中取出數(shù)據(jù),直到達到某個條件才停止。一般情況下,消費者在從緩沖區(qū)取出數(shù)據(jù)之前需要先檢查緩沖區(qū)是否為空,如果為空則等待。
-
緩沖區(qū):作為生產(chǎn)者和消費者之間的共享數(shù)據(jù)結(jié)構(gòu),用于存儲生產(chǎn)者生成的數(shù)據(jù)。緩沖區(qū)的大小限制了生產(chǎn)者和消費者之間的數(shù)據(jù)傳輸量,它可以是一個隊列、堆棧、循環(huán)緩沖區(qū)等。
-
同步機制:用于保護緩沖區(qū)的訪問,避免生產(chǎn)者和消費者同時對緩沖區(qū)進行讀寫操作而導(dǎo)致的數(shù)據(jù)不一致性。常見的同步機制包括互斥鎖(mutex)、條件變量(condition variable)、信號量(semaphore)等。
生產(chǎn)者-消費者模型的核心思想是通過合理地協(xié)調(diào)生產(chǎn)者和消費者的工作,實現(xiàn)數(shù)據(jù)的有序生成和處理。通過使用適當?shù)耐綑C制,可以保證生產(chǎn)者和消費者之間的互斥訪問和協(xié)調(diào),避免數(shù)據(jù)競爭和死鎖等并發(fā)問題。
在Java中,生產(chǎn)者-消費者模型通常是通過多線程來實現(xiàn)的。生產(chǎn)者線程負責生成數(shù)據(jù),將數(shù)據(jù)放入共享的緩沖區(qū)中;消費者線程則從緩沖區(qū)中取出數(shù)據(jù)進行處理。為了保證生產(chǎn)者和消費者之間的同步和互斥,可以使用Java提供的同步機制,例如synchronized關(guān)鍵字、ReentrantLock類、Condition接口等。
2、Java中的實現(xiàn)
首先,可以把每個生產(chǎn)者和消費者各看成是一個線程,做如下定義:
生產(chǎn)者
public class ProduceThread extends Thread{
private IKFC kfc;
public ProduceThread(String name,IKFC kfc) {
super(name);
this.kfc = kfc;
}
@Override
public void run() {
while(true){
try {
kfc.produce(getName());
sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消費者
public class ConsumerThread extends Thread{
private IKFC kfc;
public ConsumerThread(String name, IKFC kfc) {
this.kfc = kfc;
}
@Override
public void run() {
while(true){
try {
kfc.consume(getName());
sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
然后,可以通過synchronized方法, wait(), notifyAll()實現(xiàn)
這種方法等于使用this自帶的鎖來進行同步,具體辦法是將入隊和出隊設(shè)成syncrhronized。生產(chǎn)者會在入隊時(得到鎖之后)檢查隊列是否為滿,如果滿了,就釋放掉鎖并進入阻塞(wait())。等到隊列有了新的空位,消費者通過notifyAll()喚醒所有線程,此時被喚醒的生產(chǎn)者再次檢查隊列,發(fā)現(xiàn)了新的位置,就可以再繼續(xù)將產(chǎn)品入隊了,入隊完后,生產(chǎn)者會用notifyAll()通知消費者去消費。相對的,消費者也會在出隊時等待直至隊列不為空,出隊完通知。(引自鏈接:java生產(chǎn)消費者模式 java實現(xiàn)生產(chǎn)者消費者模型_mob6454cc6c8549的技術(shù)博客_51CTO博客)
實現(xiàn)類代碼:
public class KFCImpl implements IKFC {
private Queue<Food> queue = new LinkedBlockingQueue<>();
private final int MAX_SIZE = 10;
@Override
public synchronized void produce(String name) throws InterruptedException {
if (queue.size() >= MAX_SIZE) {
System.out.println("[生產(chǎn)者" + name + "] KFC生成達到上限,停止生成......");
wait();
} else {
Food food = new Food("上校雞塊");
queue.add(food);
System.out.println("[生產(chǎn)者" + name + "] 生成一個:" + food.getName() + ",KFC有食物:" + queue.size() + "個");
//喚醒等待的線程來消費
notifyAll();
}
}
@Override
public synchronized void consume(String name) throws InterruptedException {
if (queue.isEmpty()) {
System.out.println("[消費者" + name + "] KFC食物已空,消費者停止消費......");
wait();
} else {
Food food = queue.poll();
System.out.println("[消費者" + name + "] 消費一個:" + food.getName() + ",KFC有食物:" + queue.size() + "個");
//喚醒等待的線程來消費
notifyAll();
}
}
}
運行測試
public class Main {
public static void main(String[] args) {
IKFC kfc = new KFCImpl();
Thread p1= new ProduceThread("A",kfc);
Thread p2= new ProduceThread("B",kfc);
Thread p3= new ProduceThread("C",kfc);
Thread c1 = new ConsumerThread("X",kfc);
Thread c2 = new ConsumerThread("Y",kfc);
Thread c3 = new ConsumerThread("T",kfc);
Thread c4 = new ConsumerThread("Z",kfc);
Thread c5 = new ConsumerThread("K",kfc);
p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
c4.start();
c5.start();
}
}
測試結(jié)果,生產(chǎn)和消費交替進行
[生產(chǎn)者A] 生成一個:上校雞塊,KFC有食物:1個
[生產(chǎn)者B] 生成一個:上校雞塊,KFC有食物:2個
[生產(chǎn)者C] 生成一個:上校雞塊,KFC有食物:3個
[消費者Thread-2] 消費一個:上校雞塊,KFC有食物:2個
[生產(chǎn)者B] 生成一個:上校雞塊,KFC有食物:3個
[生產(chǎn)者C] 生成一個:上校雞塊,KFC有食物:4個
[生產(chǎn)者A] 生成一個:上校雞塊,KFC有食物:5個
[消費者Thread-3] 消費一個:上校雞塊,KFC有食物:4個
[消費者Thread-4] 消費一個:上校雞塊,KFC有食物:3個
[消費者Thread-1] 消費一個:上校雞塊,KFC有食物:2個
[消費者Thread-0] 消費一個:上校雞塊,KFC有食物:1個
[消費者Thread-2] 消費一個:上校雞塊,KFC有食物:0個
[生產(chǎn)者B] 生成一個:上校雞塊,KFC有食物:1個
3、應(yīng)用于消息隊列
在消息隊列中,生產(chǎn)者-消費者模型也被廣泛應(yīng)用。消息隊列是一種高效的消息傳遞機制,它可以實現(xiàn)不同應(yīng)用程序或服務(wù)之間的異步通信。在消息隊列中,生產(chǎn)者向隊列中發(fā)送消息,而消費者則從隊列中接收消息并進行處理。消息隊列通常具有以下特點:
-
可靠性:消息隊列通常使用持久化策略,可以保證消息在發(fā)送和接收過程中的可靠性和安全性。
-
異步性:生產(chǎn)者和消費者可以獨立運行,不需要等待對方的響應(yīng),從而提高系統(tǒng)的吞吐量和響應(yīng)速度。
-
解耦性:消息隊列可以實現(xiàn)不同模塊之間的解耦,降低應(yīng)用程序的復(fù)雜度和耦合度。
-
擴展性:消息隊列可以根據(jù)需求動態(tài)擴展,支持多個生產(chǎn)者和消費者并發(fā)訪問。
在消息隊列中,生產(chǎn)者-消費者模型可以通過使用不同的消息隊列實現(xiàn)。常見的消息隊列包括ActiveMQ、RabbitMQ、Kafka等,它們提供了豐富的API和特性,可以滿足不同場景下的需求。例如,ActiveMQ支持JMS規(guī)范,提供了消息確認、持久化、事務(wù)等特性;RabbitMQ支持AMQP協(xié)議,具有高可用性、可擴展性等特點;Kafka支持高吞吐量、分布式部署等特性,適合大數(shù)據(jù)處理和流式計算。
代碼實現(xiàn)
3.1 引入依賴
</dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2?rabbitmq網(wǎng)站新建隊列queue
3.3 模塊中配置application.yml
spring:
rabbitmq:
host: 192.168.***.***
port: 5672
username: admin
password: 123
logging:
level:
com.****.mq: debug
3.4 生產(chǎn)者實現(xiàn)類
@Service
public class ProducerServiceImpl implements IProducerService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(String msg) {
System.out.println("準備發(fā)送數(shù)據(jù)到mq:" + msg);
rabbitTemplate.convertAndSend("my_simple_queue", msg);
}
@Override
public void sendUser(User user) {
System.out.println("準備發(fā)送User對象數(shù)據(jù)到mq:" + user);
rabbitTemplate.convertAndSend("my_simple_queue",user);
}
}
3.5 單元測試,發(fā)送msg到rabbitmq的隊列(my_simple_queue)
3.6 消費者實現(xiàn)類
@Service
public class ConsumerServiceImpl implements IConsumerService {
//@RabbitListener(queues = "my_simple_queue")
@Override
public void consumerMessage(String msg) {
System.out.println("[消費者:]消費mq中的信息:" + msg);
}
@RabbitListener(queues = "my_simple_queue")
@Override
public void consumerUser(User user) {
System.out.println("[消費者:]消費mq中的user信息:" + user.getUsername());
}
}
3.7?從rabbitmq隊列(my_simple_queue)消費數(shù)據(jù)
3.8 隊列的配置類
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//將對象轉(zhuǎn)換為json對象形式
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
小結(jié)
總之,生產(chǎn)者-消費者模型是一種重要的并發(fā)編程模型,在Java中和消息隊列中都得到了廣泛的應(yīng)用。通過合理地使用同步機制和消息隊列,可以提高系統(tǒng)的性能、可靠性和擴展性,實現(xiàn)高效的數(shù)據(jù)傳輸和處理。此模型在很多領(lǐng)域都有廣泛應(yīng)用,例如任務(wù)調(diào)度、消息隊列、事件驅(qū)動編程等,它能有效地解耦數(shù)據(jù)生成與處理的過程,并提高系統(tǒng)的可擴展性和資源利用率。
參考:
java生產(chǎn)消費者模式 java實現(xiàn)生產(chǎn)者消費者模型_mob6454cc6c8549的技術(shù)博客_51CTO博客
Java多線程之生產(chǎn)者和消費者模型 - 簡書
生產(chǎn)者消費者模型(學(xué)習(xí)筆記)——java多線程典型案例_java寫生產(chǎn)者消費者模型_未跑路的汪汪的博客-CSDN博客
Java多線程之生產(chǎn)者消費者模式詳解_java_腳本之家
感謝閱讀,碼字不易,多謝點贊!如有不當之處,歡迎反饋指出,感謝!文章來源:http://www.zghlxwxcb.cn/news/detail-734699.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-734699.html
到了這里,關(guān)于生產(chǎn)者-消費者模型的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!