国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Java中間件】RocketMQ

這篇具有很好參考價值的文章主要介紹了【Java中間件】RocketMQ。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

RocketMQ

一、MQ概述

Message Queue,是一種提供消息隊列服務(wù)的中間件。提供了消息生產(chǎn)、存儲、消費全過程API的軟件系統(tǒng)。

MQ的作用

  • 限流削峰:當(dāng)用戶發(fā)送超量請求時,將請求暫存,以便后期慢慢處理。如果不使用MQ暫存直接請求到業(yè)務(wù)系統(tǒng)中容易引起系統(tǒng)崩潰。
  • 異步解耦:若上游系統(tǒng)和下游系統(tǒng)為同步調(diào)用,會大大降低系統(tǒng)的吞吐量和并發(fā)量。MQ層實現(xiàn)兩個系統(tǒng)之間的異步調(diào)用
  • 數(shù)據(jù)收集:分布式系統(tǒng)會產(chǎn)生海量數(shù)據(jù)流,如業(yè)務(wù)日志、監(jiān)控數(shù)據(jù)、用戶行為。針對這些數(shù)據(jù)流采集匯總,進行大數(shù)據(jù)分析。

主流應(yīng)用的MQ產(chǎn)品

  • Kafka:Scala/Java語言開發(fā)。特點是高吞吐量,但會丟數(shù)據(jù),常用與大數(shù)據(jù)領(lǐng)域的實時計算、日志采集等場景。不遵循任何MQ協(xié)議,使用自研協(xié)議。
  • RocketMQ:Java語言開發(fā)。經(jīng)過數(shù)年阿里雙十一考驗,性能與穩(wěn)定性非常高,功能全面。不遵循任何MQ協(xié)議,使用自研協(xié)議。開源版不如云上版(阿里商業(yè)版)

MQ常見協(xié)議

  • JMS:Java Messaging Service。Java平臺上有關(guān)MOM(Message Orientated Middleware)的技術(shù)規(guī)范。他便于Java應(yīng)用程序的消息交換,提供標(biāo)準(zhǔn)的接口簡化開發(fā)。ActiveMQ時典型實現(xiàn)

  • STOMP:Streaming Text Orientated Message Protocol。是一種MOM的簡單文本協(xié)議。STOMP提供一個可互操作的連接格式,允許 客戶端與任意STOMP消息代理進行交互。ActiveMQ時典型實現(xiàn)

  • AMQP:Advanced Message Queuing Protocol。一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn),是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn)。RabbitMQ是典型實現(xiàn)

  • MQTT:Message Queueing Telemetry Transport。IBM開發(fā)的一個即時通訊協(xié)議(二進制協(xié)議),主要用于服務(wù)器和低功耗IoT設(shè)備之間的通信

二、基本概念

主題(Topic):表示一類消息的集合(可以理解為消息的類型),每個消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。一個生產(chǎn)者可以同時發(fā)送多種Topic消息,而一個消費者只能接收一種Topic消息

標(biāo)簽(Tag):用于快速過濾消息

三、Linux部署RocketMQ服務(wù)

1、在官網(wǎng)下載編譯好的二進制壓縮包,版本5.0.0即可,上傳到Linux中

2、進行解壓

3、配置環(huán)境變量ROCKETMQ_HOME和NAMESRV_ADDR

【Java中間件】RocketMQ,Java中間件,java-rocketmq,java,中間件

4、配置bin目錄下的runserver.sh,根據(jù)實際情況修改JVM的內(nèi)存參數(shù)

【Java中間件】RocketMQ,Java中間件,java-rocketmq,java,中間件

5、配置bin目錄下的runbroker.sh,根據(jù)實際情況修改JVM的內(nèi)存參數(shù)

【Java中間件】RocketMQ,Java中間件,java-rocketmq,java,中間件

6、執(zhí)行nohup命令后臺運行RocketMQ服務(wù)(nameserver必須先啟動,broker需要再nameserver上注冊)

# 啟動nameserver
nohup bin/mqnamesrv &	

# 啟動broker
nohup bin/mqbroker -c [confFile] & # -c可指定加載的配置文件,默認為conf/broker.conf

# 查看日志rocketmq是否成功啟動
tail nohup.out	

# 查看進程
jps		

# 停止broker
sh bin/mqshutdown broker

# 停止namesrv
sh bin/mqshutdown namesrv

7、執(zhí)行命令測試(rocketmq提供的測試樣例,生產(chǎn)者會發(fā)送一千條消息)

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

8、執(zhí)行命令測試(rocketmq提供的測試樣例,消費者會接受一千條消息)

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

四、RocketMQ API

生產(chǎn)者同步發(fā)送消息

public void test_SyncProducer() throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    //設(shè)置注冊服務(wù)的ip地址的端口
    producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
    //啟動生產(chǎn)者
    producer.start();


    for(int i=0; i<3; i++){
        try {
            // 封裝消息,設(shè)置topic,tag(用于消息快速過濾),消息數(shù)據(jù)
            Message message = new Message(
                "TopicTest",
                "TagA",
                "ID04287777",
                ("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //同步發(fā)送消息并獲取發(fā)送結(jié)果,producer從broker獲取發(fā)送結(jié)果
            SendResult sendResult = producer.send(message);

            System.out.println(sendResult);

            Thread.sleep(1500);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    producer.shutdown();
}

生產(chǎn)者異步發(fā)送消息

public void test_AsyncProducer() throws Exception{
    DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);
    producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);

    int messageCount = 10;

    final CountDownLatch countDownLatch = new CountDownLatch(messageCount);

    for(int i=0; i<messageCount; i++){
        final int index = i;
        // 封裝消息,設(shè)置topic,tag(用于消息快速過濾),消息數(shù)據(jù)
        Message message = new Message(
            "TopicTest",
            "TagA",
            "ID04287777",
            ("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 異步發(fā)送消息,若broker有響應(yīng)會調(diào)用SendCallback中的方法
        producer.send(message, new SendCallback() {
            public void onSuccess(SendResult sendResult) {
                countDownLatch.countDown();
                System.out.println("    Send Message "+ index +" OK: "+sendResult);
            }

            public void onException(Throwable throwable) {
                countDownLatch.countDown();
                System.out.println("    Send Message "+ index +" Exception: "+throwable);
            }
        });

        //單向發(fā)送
        producer.sendOneway(message);

        System.out.println("Message "+index+" send done");
    }
    //在100條消息發(fā)送完后關(guān)閉
    countDownLatch.await(5, TimeUnit.SECONDS);
    producer.shutdown();
}

生產(chǎn)者單向發(fā)送消息

public void test_OneWayProducer() throws Exception{
    DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME);
    producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);

    int messageCount = 10;

    final CountDownLatch countDownLatch = new CountDownLatch(messageCount);

    for(int i=0; i<messageCount; i++){
        final int index = i;
        // 封裝消息,設(shè)置topic,tag(用于消息快速過濾),消息數(shù)據(jù)
        Message message = new Message(
            "TopicTest",
            "TagA",
            "ID04287777",
            ("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

        //單向發(fā)送
        producer.sendOneway(message);

        System.out.println("Message "+index+" send done");
    }
    //在100條消息發(fā)送完后關(guān)閉
    countDownLatch.await(5, TimeUnit.SECONDS);
    producer.shutdown();
}

消費者推模式

public static void test_PushConsumer() throws Exception{
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");

    consumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    //消費者訂閱的消息topic和tag(subExpression,*表示任意)
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("Receive New Message : "+list);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();

    System.out.println("Consumer Start...");
}

消費者拉模式

不同于推模式消費者,拉模式下需要手動管理消息隊列MessageQueue和偏移量offset的映射關(guān)系。但是最新的LitePullConsumer底層源碼已經(jīng)實現(xiàn)對mq和offset的管理,比較方便。

//拉模式消費者
public static void test_LitePullConsumer() throws Exception{
    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(RocketMQConstant.CONSUMER_GROUP_NAME);
    litePullConsumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR);
    litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    litePullConsumer.subscribe("TopicTest", "*");
    litePullConsumer.start();

    try {
        while(true){
            List<MessageExt> messageExts = litePullConsumer.poll();
            System.out.printf("%s%n", messageExts);
        }
    }finally {
        litePullConsumer.shutdown();
    }
}

RocketMQ傳遞對象,對象所屬類需要實現(xiàn)序列化接口,并且將對象轉(zhuǎn)換為字節(jié)數(shù)組存入消息體中。

順序消息

保證消息的局部有序(其中幾條消息的有序,不一定是全部消息都要有序),以防止受到網(wǎng)絡(luò)傳輸?shù)挠绊憽?/p>

實現(xiàn)原理

生產(chǎn)者將一組有序的消息一次發(fā)到同一個MessageQueue中(依靠隊列的特點保證局部有序性)。消費者消費完一個MessageQueue的消息后才會去消費下一個MessageQueue的消息。

public class OrderProducer {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);
        try {
            producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
            producer.start();
            for(int i=0; i<5; i++){
                //用于指定順序的id
                int orderId = i;

                for(int j=0; j<5; j++){
                    Message message = new Message(
                            WanfengConstant.ORDER_TOPIC,
                            "order_"+orderId,
                            "KEY"+orderId,
                            ("order_"+orderId+" step "+j).getBytes(RemotingHelper.DEFAULT_CHARSET)
                    );
                    //實現(xiàn)消息隊列選擇器對象,使同一個orderId的消息發(fā)送到同一個消息隊列
                    SendResult sendResult = producer.send(
                            message,
                            new MessageQueueSelector() {
                                @Override
                                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                    Integer id = (Integer) arg;
                                    int index = id % mqs.size();
                                    return mqs.get(index);
                                }
                            },
                            orderId
                    );
                    System.out.printf("%s%n", sendResult);
                }
            }

        }catch(Exception e){
            e.printStackTrace();
            producer.shutdown();
        }

    }
}
public class OrderConsumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);
        consumer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        try {
            consumer.subscribe(WanfengConstant.ORDER_TOPIC, "*");
            //實現(xiàn)順序消息監(jiān)聽者接口
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    for(MessageExt messageExt : msgs){
                        System.out.println("Receive Message: " + new String(messageExt.getBody()));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Start...");
        } catch (Exception e) {
            e.printStackTrace();
            consumer.shutdown();
        }
    }
}

廣播消息

生產(chǎn)者發(fā)送的消息推送給所有g(shù)roup的消費者

實現(xiàn)原理:將消費者設(shè)置MessageModel為廣播模式。

public class BroadcastConsumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //設(shè)定消息模式為廣播
        consumer.setMessageModel(MessageModel.BROADCASTING);
        try {
            consumer.subscribe(WanfengConstant.ARCHIVE_TOPIC, "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    msgs.forEach(messageExt -> {
                        Archive archive = (Archive) WanfengObjectUtil.bytesToObject(messageExt.getBody());
                        System.out.println("Receive Message : "+archive.getId());
                    });
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Broadcast Consumer Start...");
        }catch (Exception e){
            e.printStackTrace();
            consumer.shutdown();
        }
    }
}

若指定MessageModel為CLUSTERING,則生產(chǎn)者發(fā)送的消息會隨機指定消費者消費。

延遲消息

顧名思義就是消息發(fā)送到broker時延遲指定的時間后再發(fā)送給消費者。常用于定時發(fā)送

過濾消息

過濾消息通過tag實現(xiàn),在消費者端指定過濾的tag即可。

//消費者訂閱tag1或tag2的消息
consumer.subscribe("TopicTest", "tag1 || tag2");

在RocketMQ中,消費者指定過濾條件后,將其上推到Broker中,在Broker中進行tag過濾,以減少網(wǎng)絡(luò)IO,但同時也增加了Broker的繁忙。

事務(wù)消息

【Java中間件】RocketMQ,Java中間件,java-rocketmq,java,中間件

public class TransactionProducer {
    public static void main(String[] args) {
        TransactionMQProducer producer = new TransactionMQProducer(WanfengConstant.PRODUCER_GROUP_NAME);
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.out.println("[WANFENG-INFO] TransactionProducer.executeLocalTransaction(): 執(zhí)行成功...");

                String tags = msg.getTags();
                if (StringUtils.contains(tags, "TagA")) {
                    //消息提交(發(fā)送出去)
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.contains(tags, "TagB")) {
                    //消息回滾(丟掉消息)
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("[WANFENG-INFO] TransactionProducer.checkLocalTransaction(): 執(zhí)行成功...");
                String tags = msg.getTags();
                if (StringUtils.contains(tags, "TagC")) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }
        };
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                5,
                100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3)
        );
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        try {
            producer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        CountDownLatch countDownLatch = new CountDownLatch(9);
        for (int i = 0; i < 9; i++) {
            try {
                Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(message, null);
                System.out.println(sendResult);
                Thread.sleep(1000);
                countDownLatch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            producer.shutdown();
        }


    }
}

ACL權(quán)限控制

ACL對用戶對Topic資源的訪問權(quán)限進行控制

在pom依賴中引入acl的依賴包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>5.0.0</version>
</dependency>

在服務(wù)端的conf/broker.conf文件,添加配置,開啟acl

aclEnable=true

在服務(wù)端的conf/plain_acl.yml文件,配置具體權(quán)限規(guī)則(熱加載,不需要重啟mq)

accounts:
  - accessKey: RocketMQ #用戶名
    secretKey: 12345678 #密碼
    whiteRemoteAddress:   #訪問地址白名單
    admin: false	#是否為管理員(管理員可以訪問所有Topic)
    defaultTopicPerm: DENY #默認Topic訪問權(quán)限
    defaultGroupPerm: SUB  #默認組權(quán)限
    topicPerms:		#Topic對應(yīng)的權(quán)限,若這里找不到則采用defaultTopicPerm
      - topicA=DENY 	
      - topicB=PUB|SUB
      - topicC=SUB
    groupPerms:
      # the group should convert to retry topic
      - groupA=DENY
      - groupB=PUB|SUB
      - groupC=SUB

在創(chuàng)建生產(chǎn)者對象時需加入RPCHook(acl的用戶信息)

public class AclProducer {

    private static final String ACL_ACCESS_KEY = "RocketMQ";

    private static final String ACL_SECRET_KEY = "12345678";

    /**
     * 通過用戶名和密碼獲取RPCHook
     * @return
     */
    public static RPCHook getAclRPCHook(){
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));
    }

    public static void main(String[] args) throws MQClientException, InterruptedException {
        //創(chuàng)建生產(chǎn)者時加入用戶信息,即RPCHook
        DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, getAclRPCHook());
        producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
        producer.start();

        for (int i = 0; i < 20; i++) {
            try {
                Message message = new Message(
                        "TopicTest",
                        WanfengConstant.TAGS_NAME,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息體轉(zhuǎn)換成二進制數(shù)組*/
                );
                SendResult sendResult = producer.send(message);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
    }
}

消息軌跡

Producer,Consumer,Broker處理消息的相關(guān)信息

消息軌跡的實現(xiàn)原理是MQ把消息軌跡都往RMQ_SYS_TRACE_TOPIC的Topic中放

在Broker端配置文件開啟消息軌跡

traceTopicEnable=true

創(chuàng)建生產(chǎn)者時指定enableMsgTrace參數(shù)為true,開啟消息軌跡。也可以指定customizedTraceTopic參數(shù)來自定義消息軌跡的Topic。文章來源地址http://www.zghlxwxcb.cn/news/detail-616237.html

public class TraceProducer {
    public static void main(String[] args) throws MQClientException {
        //指定enableMsgTrace參數(shù)為true,開啟消息軌跡
        DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, true);
        producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR);
        producer.start();

        for (int i = 0; i < 20; i++) {
            try {
                Message message = new Message(
                        "TopicTest",
                        WanfengConstant.TAGS_NAME,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息體轉(zhuǎn)換成二進制數(shù)組*/
                );
                SendResult sendResult = producer.send(message);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

到了這里,關(guān)于【Java中間件】RocketMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 中間件:RocketMQ安裝部署

    下載 配置 broker.conf 的brokerIP1 為公網(wǎng)ip 啟動命令: 查看集群狀態(tài) benchmark目錄下

    2024年02月12日
    瀏覽(19)
  • 消息隊列中間件 MetaQ/RocketMQ

    消息隊列中間件 MetaQ/RocketMQ

    推薦電子書:云原生架構(gòu)白皮書 2022版-藏經(jīng)閣-阿里云開發(fā)者社區(qū) (aliyun.com) 簡介—— 消息隊列中間件 MetaQ/RocketMQ 中間件 MetaQ 是一種基于隊列模型的消息中間件,MetaQ 據(jù)說最早是受 Kafka 的影響開發(fā)的,第一版的名字?\\\"metamorphosis\\\",是奧地利作家卡夫卡的名作——《變形記》。

    2024年02月14日
    瀏覽(95)
  • 中間件上云部署 rocketmq

    中間件上云部署 rocketmq

    Apache RocketMQ是一個分布式消息傳遞和流媒體平臺,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可伸縮性。 發(fā)布/訂閱消息傳遞模型 定期消息傳遞 按時間或偏移量進行消息回溯 日志中心流 大數(shù)據(jù)集成 在同一隊列中可靠的FIFO和嚴(yán)格的有序消息傳遞 有效的拉伸消費

    2024年02月16日
    瀏覽(19)
  • 消息中間件之RocketMQ源碼分析(十)

    消息中間件之RocketMQ源碼分析(十)

    啟動命令 nohup ./bin/mqnamesrv -c ./conf/namesrv.conf dev/null 21 通過腳本配置啟動基本參數(shù),比如配置文件路徑、JVM參數(shù),調(diào)用NamesrvStartup.main()方法,解析命令行的參數(shù),將處理好的參數(shù)轉(zhuǎn)化為Java實例,傳遞給NamesrvController實例 加載命令行傳遞的配置參數(shù),調(diào)用controller.initialize()方法初

    2024年02月20日
    瀏覽(21)
  • 分布式消息中間件RocketMQ的應(yīng)用

    分布式消息中間件RocketMQ的應(yīng)用

    所有代碼同步至GitCode:https://gitcode.net/ruozhuliufeng/test-rocketmq.git 普通消息 消息發(fā)送分類 ? Producer對于消息的發(fā)送方式也有多種選擇,不同的方式會產(chǎn)生不同的系統(tǒng)效果。 同步發(fā)送消息 ? 同步發(fā)送消息是指,Producer發(fā)出一條消息后,會在收到MQ返回的ACK之后才發(fā)下一條消息。

    2024年02月05日
    瀏覽(21)
  • Kafka、RabbitMQ、RocketMQ中間件的對比

    Kafka、RabbitMQ、RocketMQ中間件的對比

    消息中間件現(xiàn)在有不少,網(wǎng)上很多文章都對其做過對比,在這我對其做進一步總結(jié)與整理。 ? ? RocketMQ 淘寶內(nèi)部的交易系統(tǒng)使用了淘寶自主研發(fā)的Notify消息中間件,使用Mysql作為消息存儲媒介,可完全水平擴容,為了進一步降低成本,我們認為存儲部分可以進一步優(yōu)化,201

    2024年02月05日
    瀏覽(25)
  • 【Alibaba中間件技術(shù)系列】「RocketMQ技術(shù)專題」小白專區(qū)之領(lǐng)略一下RocketMQ基礎(chǔ)之最!

    【Alibaba中間件技術(shù)系列】「RocketMQ技術(shù)專題」小白專區(qū)之領(lǐng)略一下RocketMQ基礎(chǔ)之最!

    應(yīng)一些小伙伴們的私信,希望可以介紹一下RocketMQ的基礎(chǔ),那么我們現(xiàn)在就從0開始,進入RocketMQ的基礎(chǔ)學(xué)習(xí)及概念介紹,為學(xué)習(xí)和使用RocketMQ打好基礎(chǔ)! RocketMQ是一款快速地、可靠地、分布式、容易使用的消息中間件,由Alibaba開發(fā),其前身是 Metaq,Metaq 可以看成是linkedin的Ka

    2024年02月12日
    瀏覽(25)
  • 【消息中間件】RocketMQ消息重復(fù)消費場景及解決辦法

    【消息中間件】RocketMQ消息重復(fù)消費場景及解決辦法

    消息重復(fù)消費是各個MQ都會發(fā)生的常見問題之一,在一些比較敏感的場景下,重復(fù)消費會造成比較嚴(yán)重的后果,比如重復(fù)扣款等。 當(dāng)系統(tǒng)的調(diào)用鏈路比較長的時候,比如系統(tǒng)A調(diào)用系統(tǒng)B,系統(tǒng)B再把消息發(fā)送到RocketMQ中,在系統(tǒng)A調(diào)用系統(tǒng)B的時候,如果系統(tǒng)B處理成功,但是遲遲

    2024年02月05日
    瀏覽(41)
  • ActiveMQ、RabbitMQ、Kafka、RocketMQ消息中間件技術(shù)選型

    消息中間件是分布式系統(tǒng)中重要的組件之一,用于實現(xiàn)異步通信、解耦系統(tǒng)、提高系統(tǒng)可靠性和擴展性。在做消息中間件技術(shù)選型時,需要考慮多個因素,包括可靠性、性能、可擴展性、功能豐富性、社區(qū)支持和成本等。本文將五種流行的消息中間件技術(shù):ActiveMQ、RabbitMQ、

    2024年02月11日
    瀏覽(22)
  • 【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    【消息中間件】詳解三大MQ:RabbitMQ、RocketMQ、Kafka

    作者簡介 前言 博主之前寫過一個完整的MQ系列,包含RabbitMQ、RocketMQ、Kafka,從安裝使用到底層機制、原理。專欄地址: https://blog.csdn.net/joker_zjn/category_12142400.html?spm=1001.2014.3001.5482 本文是該系列的清單綜述,會拉通來聊一下三大MQ的特點和各種適合的場景。 目錄 1.概述 1.1.M

    2024年02月09日
    瀏覽(53)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包