国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Java中如何使用消息隊(duì)列實(shí)現(xiàn)異步(ActiveMQ,RabbitMQ,Kafka)

這篇具有很好參考價(jià)值的文章主要介紹了Java中如何使用消息隊(duì)列實(shí)現(xiàn)異步(ActiveMQ,RabbitMQ,Kafka)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、ActiveMQ 示例

在 Java 中,可以使用消息隊(duì)列實(shí)現(xiàn)異步處理。下面是一個(gè)簡(jiǎn)單的示例代碼,用于說(shuō)明如何使用 ActiveMQ 實(shí)現(xiàn)消息隊(duì)列異步處理:

  1. 添加 ActiveMQ 依賴(lài)

在 pom.xml 文件中添加以下依賴(lài):

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.16.3</version>
</dependency>
  1. 創(chuàng)建消息隊(duì)列

創(chuàng)建一個(gè)名為 “TestQueue” 的消息隊(duì)列,并配置 ActiveMQ 連接信息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TestQueue {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        
        // 創(chuàng)建連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        // 創(chuàng)建會(huì)話(huà)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 創(chuàng)建隊(duì)列
        Queue queue = session.createQueue("TestQueue");
        
        // 創(chuàng)建生產(chǎn)者
        MessageProducer producer = session.createProducer(queue);
        
        // 發(fā)送消息
        for (int i = 0; i < 10; i++) {
            TextMessage message = session.createTextMessage("Message " + i);
            producer.send(message);
        }
        
        // 關(guān)閉連接
        connection.close();
    }
}
  1. 創(chuàng)建消息消費(fèi)者

創(chuàng)建一個(gè)消息消費(fèi)者,并實(shí)現(xiàn) MessageListener 接口,以便異步處理消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TestConsumer implements MessageListener {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        
        // 創(chuàng)建連接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        // 創(chuàng)建會(huì)話(huà)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 創(chuàng)建隊(duì)列
        Queue queue = session.createQueue("TestQueue");
        
        // 創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new TestConsumer());
        
        // 等待消息
        Thread.sleep(5000);
        
        // 關(guān)閉連接
        connection.close();
    }
    
    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
  1. 運(yùn)行代碼

在命令行中分別執(zhí)行 TestQueue 和 TestConsumer 兩個(gè)類(lèi),可以看到生產(chǎn)者向消息隊(duì)列發(fā)送了10條消息,并由消息消費(fèi)者異步處理這些消息。

使用消息隊(duì)列可以有效地實(shí)現(xiàn)異步處理,可以提高應(yīng)用程序的性能和并發(fā)能力。在上述示例代碼中,生產(chǎn)者通過(guò)創(chuàng)建消息并將其發(fā)送到隊(duì)列中,而消費(fèi)者則監(jiān)聽(tīng)隊(duì)列并異步處理接收到的消息。消息隊(duì)列可以起到解耦作用,使得生產(chǎn)者和消費(fèi)者之間的交互更加靈活和可靠,因?yàn)橄㈥?duì)列具有緩沖和異步處理的特點(diǎn),即使某個(gè)消費(fèi)者出現(xiàn)故障,也不會(huì)影響消息的傳遞和處理。

總的來(lái)說(shuō),消息隊(duì)列是一種重要的異步通信機(jī)制,能夠提高系統(tǒng)的可靠性和可伸縮性,適用于各種分布式系統(tǒng)和大規(guī)模應(yīng)用程序。除了 ActiveMQ,還有其他很多優(yōu)秀的消息隊(duì)列實(shí)現(xiàn),比如 RabbitMQ、Kafka 等。

二、RabbitMQ

  1. RabbitMQ 示例代碼

在 Java 中,可以使用 RabbitMQ 的 Java 客戶(hù)端實(shí)現(xiàn)消息隊(duì)列的異步處理。下面是一個(gè)簡(jiǎn)單的示例代碼,用于說(shuō)明如何使用 RabbitMQ 實(shí)現(xiàn)消息隊(duì)列異步處理:

  1. 添加 RabbitMQ 依賴(lài)

在 pom.xml 文件中添加以下依賴(lài):

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>
  1. 創(chuàng)建消息隊(duì)列

創(chuàng)建一個(gè)名為 “TestQueue” 的消息隊(duì)列,并配置 RabbitMQ 連接信息:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TestQueue {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        
        // 創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
        
        // 創(chuàng)建通道
        Channel channel = connection.createChannel();
        
        // 創(chuàng)建隊(duì)列
        channel.queueDeclare("TestQueue", false, false, false, null);
        
        // 發(fā)送消息
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            channel.basicPublish("", "TestQueue", null, message.getBytes());
        }
        
        // 關(guān)閉連接
        channel.close();
        connection.close();
    }
}
  1. 創(chuàng)建消息消費(fèi)者

創(chuàng)建一個(gè)消息消費(fèi)者,并實(shí)現(xiàn) Consumer 接口,以便異步處理消息:

import com.rabbitmq.client.*;

import java.io.IOException;

public class TestConsumer implements Consumer {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        
        // 創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
        
        // 創(chuàng)建通道
        Channel channel = connection.createChannel();
        
        // 創(chuàng)建隊(duì)列
        channel.queueDeclare("TestQueue", false, false, false, null);
        
        // 創(chuàng)建消費(fèi)者
        channel.basicConsume("TestQueue", true, new TestConsumer());
        
        // 等待消息
        Thread.sleep(5000);
        
        // 關(guān)閉連接
        channel.close();
        connection.close();
    }
    
    @Override
    public void handleConsumeOk(String consumerTag) {
        
    }
    
    @Override
    public void handleCancelOk(String consumerTag) {
        
    }
    
    @Override
    public void handleCancel(String consumerTag) throws IOException {
        
    }
    
    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        
    }
    
    @Override
    public void handleRecoverOk(String consumerTag) {
        
    }
    
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);
    }
}

三、 Kafka 示例

在 Java 中,可以使用 Kafka 的 Java 客戶(hù)端實(shí)現(xiàn)消息隊(duì)列的異步處理。下面是一個(gè)簡(jiǎn)單的示例代碼,用于說(shuō)明如何使用 Kafka 實(shí)現(xiàn)消息隊(duì)列異步處理:

  1. 添加 Kafka 依賴(lài)

在 pom.xml 文件中添加以下依賴(lài):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>
  1. 創(chuàng)建消息隊(duì)列

創(chuàng)建一個(gè)名為 “TestTopic” 的消息隊(duì)列,并配置 Kafka 連接信息:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class TestProducer {
    public static void main(String[] args) throws Exception {
        // 配置 Kafka 連接信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // 創(chuàng)建生產(chǎn)者
        Producer<String, String> producer = new KafkaProducer<>(properties);
        
        // 發(fā)送消息
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            producer.send(new ProducerRecord<>("TestTopic", message));
        }
        
        // 關(guān)閉生產(chǎn)者
        producer.close();
    }
}
  1. 創(chuàng)建消息消費(fèi)者

創(chuàng)建一個(gè)消息消費(fèi)者,并實(shí)現(xiàn) Consumer 接口,以便異步處理消息:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-594383.html

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class TestConsumer {
    public static void main(String[] args) throws Exception {
        // 配置 Kafka 連接信息
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "TestGroup");
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        // 創(chuàng)建消費(fèi)者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("TestTopic"));
        
        // 消費(fèi)消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
                
                // 手動(dòng)提交消費(fèi)位移
                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                consumer.commitSync(offsets);
            }
        }
    }
}

到了這里,關(guān)于Java中如何使用消息隊(duì)列實(shí)現(xiàn)異步(ActiveMQ,RabbitMQ,Kafka)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式消息隊(duì)列:Kafka vs RabbitMQ vs ActiveMQ

    在現(xiàn)代分布式系統(tǒng)中,消息隊(duì)列是一種常見(jiàn)的異步通信模式,它可以幫助系統(tǒng)處理高并發(fā)、高可用性以及容錯(cuò)等問(wèn)題。在這篇文章中,我們將深入探討三種流行的分布式消息隊(duì)列:Apache Kafka、RabbitMQ和ActiveMQ。我們將討論它們的核心概念、算法原理、特點(diǎn)以及使用場(chǎng)景。 隨著

    2024年02月02日
    瀏覽(19)
  • 【Spring云原生系列】Spring RabbitMQ:異步處理機(jī)制的基礎(chǔ)--消息隊(duì)列 原理講解+使用教程

    【Spring云原生系列】Spring RabbitMQ:異步處理機(jī)制的基礎(chǔ)--消息隊(duì)列 原理講解+使用教程

    ???? 歡迎光臨,終于等到你啦 ???? ??我是 蘇澤 ,一位對(duì)技術(shù)充滿(mǎn)熱情的探索者和分享者。???? ??持續(xù)更新的專(zhuān)欄 《Spring 狂野之旅:從入門(mén)到入魔》 ?? 本專(zhuān)欄帶你從Spring入門(mén)到入魔 ? 這是蘇澤的個(gè)人主頁(yè)可以看到我其他的內(nèi)容哦???? 努力的蘇澤 http://suzee.blog.

    2024年03月15日
    瀏覽(30)
  • RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    RabbitMQ實(shí)現(xiàn)延遲消息,RabbitMQ使用死信隊(duì)列實(shí)現(xiàn)延遲消息,RabbitMQ延時(shí)隊(duì)列插件

    假設(shè)有一個(gè)業(yè)務(wù)場(chǎng)景:超過(guò)30分鐘未付款的訂單自動(dòng)關(guān)閉,這個(gè)功能應(yīng)該怎么實(shí)現(xiàn)? RabbitMQ使用死信隊(duì)列,可以實(shí)現(xiàn)消息的延遲接收。 隊(duì)列有一個(gè)消息過(guò)期屬性。就像豐巢超過(guò)24小時(shí)就收費(fèi)一樣,通過(guò)設(shè)置這個(gè)屬性,超過(guò)了指定事件的消息將會(huì)被丟棄。 這個(gè)屬性交:x-message

    2024年02月13日
    瀏覽(104)
  • Java RabbitMQ消息隊(duì)列簡(jiǎn)單使用

    Java RabbitMQ消息隊(duì)列簡(jiǎn)單使用

    消息隊(duì)列,即MQ,Message Queue。 消息隊(duì)列是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒(méi)有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者的解耦。

    2024年02月12日
    瀏覽(34)
  • 【Java】微服務(wù)——RabbitMQ消息隊(duì)列(SpringAMQP實(shí)現(xiàn)五種消息模型)

    【Java】微服務(wù)——RabbitMQ消息隊(duì)列(SpringAMQP實(shí)現(xiàn)五種消息模型)

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊:就像打電話(huà),需要實(shí)時(shí)響應(yīng)。 異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話(huà)可以立即得到響應(yīng),但是你卻不能跟多個(gè)人同時(shí)通話(huà)。發(fā)送郵件可以同時(shí)與多個(gè)人收發(fā)郵件,但是往往響應(yīng)會(huì)有延遲。 1.

    2024年02月08日
    瀏覽(18)
  • TP5簡(jiǎn)單使用RabbitMQ實(shí)現(xiàn)消息隊(duì)列

    在使用 RabbitMQ 之前,你要安裝好 RabbitMQ 服務(wù),具體安裝方法可以參考 windows下安裝RabbitMQ 1、安裝擴(kuò)展 進(jìn)入TP5 更目錄下,輸入命令安裝: composer require php-amqplib/php-amqplib 2、自定義命令 TP5 的自定義命令,這里也簡(jiǎn)單說(shuō)下。 第一步: 創(chuàng)建命令類(lèi)文件,新建 application/api/command

    2024年02月07日
    瀏覽(27)
  • 消息隊(duì)列中間件,RabbitMQ的使用,死信隊(duì)列,延遲隊(duì)列,利用枚舉實(shí)現(xiàn)隊(duì)列,交換機(jī),RountKey的聲明

    消息隊(duì)列中間件,RabbitMQ的使用,死信隊(duì)列,延遲隊(duì)列,利用枚舉實(shí)現(xiàn)隊(duì)列,交換機(jī),RountKey的聲明

    目錄 0.交換機(jī)種類(lèi)和區(qū)別 1.聲明隊(duì)列和交換機(jī)以及RountKey 2.初始化循環(huán)綁定 3.聲明交換機(jī) 4.監(jiān)聽(tīng)隊(duì)列 4.1 監(jiān)聽(tīng)普通隊(duì)列 4.2監(jiān)聽(tīng)死信隊(duì)列 ?5.削峰填谷的實(shí)現(xiàn) Direct Exchange(直連交換機(jī)) : 直連交換機(jī)將消息發(fā)送到與消息的路由鍵完全匹配的隊(duì)列。它是最簡(jiǎn)單的交換機(jī)類(lèi)型之一。

    2024年04月23日
    瀏覽(587)
  • 【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_(tái)消息可靠性_死信交換機(jī)_惰性隊(duì)列_MQ集群

    【學(xué)習(xí)日記2023.6.19】 之 RabbitMQ服務(wù)異步通信_(tái)消息可靠性_死信交換機(jī)_惰性隊(duì)列_MQ集群

    消息隊(duì)列在使用過(guò)程中,面臨著很多實(shí)際問(wèn)題需要思考: 消息從發(fā)送,到消費(fèi)者接收,會(huì)經(jīng)歷多個(gè)過(guò)程: 其中的每一步都可能導(dǎo)致消息丟失,常見(jiàn)的丟失原因包括: 發(fā)送時(shí)丟失: 生產(chǎn)者發(fā)送的消息未送達(dá)exchange 消息到達(dá)exchange后未到達(dá)queue MQ宕機(jī),queue將消息丟失 consumer接收

    2024年02月11日
    瀏覽(98)
  • 消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    消息隊(duì)列-RabbitMQ:延遲隊(duì)列、rabbitmq 插件方式實(shí)現(xiàn)延遲隊(duì)列、整合SpringBoot

    1、延遲隊(duì)列概念 延時(shí)隊(duì)列內(nèi)部是有序的 , 最重要的特性 就體現(xiàn)在它的 延時(shí)屬性 上,延時(shí)隊(duì)列中的元素是希望在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō), 延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。 延遲隊(duì)列使用場(chǎng)景: 訂單在十分鐘之內(nèi)未支付則

    2024年02月22日
    瀏覽(20)
  • Java開(kāi)發(fā) - 消息隊(duì)列之RabbitMQ初體驗(yàn)

    Java開(kāi)發(fā) - 消息隊(duì)列之RabbitMQ初體驗(yàn)

    目錄 前言 RabbitMQ 什么是RabbitMQ RabbitMQ特點(diǎn) 安裝啟動(dòng) RabbitMQ和Kafka的消息收發(fā)區(qū)別 RabbitMQ使用案例 添加依賴(lài) 添加配置 創(chuàng)建RabbitMQ配置類(lèi) RabbitMQ消息的發(fā)送 RabbitMQ消息的接收 測(cè)試 結(jié)語(yǔ) 前一篇,我們學(xué)習(xí)了Kafka的基本使用,這一篇,我們來(lái)學(xué)習(xí)RabbitMQ。他們作為消息隊(duì)列本身都具

    2024年02月03日
    瀏覽(21)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包