国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

ActiveMQ 02 常用API

這篇具有很好參考價值的文章主要介紹了ActiveMQ 02 常用API。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Active MQ 02

常用API

事務

session.commit();
session.rollback();

用來提交/回滾事務

Purge

清理消息

簽收模式

簽收代表接收端的session已收到消息的一次確認,反饋給broker

ActiveMQ支持自動簽收與手動簽收

Session.AUTO_ACKNOWLEDGE

當客戶端從receiver或onMessage成功返回時,Session自動簽收客戶端的這條消息的收條。

Session.CLIENT_ACKNOWLEDGE

客戶端通過調(diào)用消息(Message)的acknowledge方法簽收消息。在這種情況下,簽收發(fā)生在Session層面:簽收一個已經(jīng)消費的消息會自動地簽收這個Session所有已消費的收條。

Session.DUPS_OK_ACKNOWLEDGE

Session不必確保對傳送消息的簽收,這個模式可能會引起消息的重復,但是降低了Session的開銷,所以只有客戶端能容忍重復的消息,才可使用。

持久化

默認持久化是開啟的

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

優(yōu)先級

可以打亂消費順序

producer.setPriority

配置文件需要指定使用優(yōu)先級的目的地

<policyEntry queue="queue1" prioritizedMessages="true" />

消息超時/過期

producer.setTimeToLive

設置了消息超時的消息,消費端在超時后無法在消費到此消息。

給消息設置一個超時時間 -> 死信隊列 -> 拿出來 -> 重發(fā)

死信

此類消息會進入到ActiveMQ.DLQ隊列且不會自動清除,稱為死信

此處有消息堆積的風險

修改死信隊列名稱
			  <policyEntry queue="f" prioritizedMessages="true" >
				<deadLetterStrategy> 

					<individualDeadLetterStrategy   queuePrefix="DLxxQ." useQueueForQueueMessages="true" /> 

				</deadLetterStrategy> 
			  </policyEntry>

useQueueForQueueMessages: 設置使用隊列保存死信,還可以設置useQueueForTopicMessages,使用Topic來保存死信

讓非持久化的消息也進入死信隊列
			<individualDeadLetterStrategy   queuePrefix="DLxxQ." useQueueForQueueMessages="true"  processNonPersistent="true" /> 

processNonPersistent=“true”

過期消息不進死信隊列
<individualDeadLetterStrategy   processExpired="false"  /> 

獨占消費者

	Queue queue = session.createQueue("xxoo?consumer.exclusive=true");

還可以設置優(yōu)先級

Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");

消息類型

object

發(fā)送端
		Girl girl = new Girl("qiqi",25,398.0);
		
		Message message = session.createObjectMessage(girl);
接受端
		if(message instanceof ActiveMQObjectMessage) {
			
			Girl girl = (Girl)((ActiveMQObjectMessage)message).getObject();
			
			System.out.println(girl);
			System.out.println(girl.getName());
		}

如果遇到此類報錯

Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
	at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
	at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
	at com.mashibing.mq.Receiver.main(Receiver.java:65)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
	at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
	at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)
	... 1 more

需要添加信任

		connectionFactory.setTrustedPackages(
				new ArrayList<String>(
						Arrays.asList(
								new String[]{
										Girl.class.getPackage().getName()
										}
								
								)
						)
				
				);

bytesMessage

發(fā)送端
		BytesMessage bytesMessage = session.createBytesMessage();
        bytesMessage.writeBytes("str".getBytes());
        bytesMessage.writeUTF("哈哈");
接受端
		if(message instanceof BytesMessage) {
			BytesMessage bm = (BytesMessage)message;
			
			 byte[] b = new byte[1024];
             int len = -1;
             while ((len = bm.readBytes(b)) != -1) {
                 System.out.println(new String(b, 0, len));
             }
		}

還可以使用ActiveMQ給提供的便捷方法,但要注意讀取和寫入的順序

bm.readBoolean()
bm.readUTF()
寫入文件
                    FileOutputStream out = null;
                    try {
                        out = new FileOutputStream("d:/aa.txt");
                    } catch (FileNotFoundException e2) {
                        e2.printStackTrace();
                    }
                    byte[] by = new byte[1024];
                    int len = 0 ;
                    try {
                        while((len = bm.readBytes(by))!= -1){
                            out.write(by,0,len);
                        }
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }

MapMessage

發(fā)送端
		MapMessage mapMessage = session.createMapMessage();
      
		
		mapMessage.setString("name","lucy");
        mapMessage.setBoolean("yihun",false);
		mapMessage.setInt("age", 17);
		
		producer.send(mapMessage);
接收端
		Message message = consumer.receive();
		MapMessage mes = (MapMessage) message;
		
		System.out.println(mes);

		System.out.println(mes.getString("name"));

消息發(fā)送原理

同步與異步

開啟事務 關(guān)閉事務
持久化 異步 同步
非持久化 異步 異步

我們可以通過以下幾種方式來設置異步發(fā)送:

		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"admin",
				"admin",
				"tcp://localhost:61616"
				);
		// 2.獲取一個向ActiveMQ的連接
		connectionFactory.setUseAsyncSend(true);
		ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
		connection.setUseAsyncSend(true);

消息堆積

producer每發(fā)送一個消息,統(tǒng)計一下發(fā)送的字節(jié)數(shù),當字節(jié)數(shù)達到ProducerWindowSize值時,需要等待broker的確認,才能繼續(xù)發(fā)送。

brokerUrl中設置: tcp://localhost:61616?jms.producerWindowSize=1048576

destinationUri中設置: myQueue?producer.windowSize=1048576

延遲消息投遞

首先在配置文件中開啟延遲和調(diào)度

schedulerSupport=“true”

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

延遲發(fā)送

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);

帶間隔的重復發(fā)送

		long delay = 10 * 1000;
		long period = 2 * 1000;
		int repeat = 9;
		message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
		message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
		message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
		createProducer.send(message);

Cron表達式定時發(fā)送

Cron表達式是一個字符串,字符串以5或6個空格隔開,分為6或7個域,每一個域代表一個含義,Cron有如下兩種語法格式:

Seconds Minutes Hours DayofMonth Month DayofWeek Year或

Seconds Minutes Hours DayofMonth Month DayofWeek

每一個域可出現(xiàn)的字符如下:

Seconds:可出現(xiàn)", - * /"四個字符,有效范圍為0-59的整數(shù)

Minutes:可出現(xiàn)", - * /"四個字符,有效范圍為0-59的整數(shù)

Hours:可出現(xiàn)", - * /"四個字符,有效范圍為0-23的整數(shù)

DayofMonth:可出現(xiàn)", - * / ? L W C"八個字符,有效范圍為0-31的整數(shù)

Month:可出現(xiàn)", - * /"四個字符,有效范圍為1-12的整數(shù)或JAN-DEc

DayofWeek:可出現(xiàn)", - * / ? L C #"四個字符,有效范圍為1-7的整數(shù)或SUN-SAT兩個范圍。1表示星期天,2表示星期一, 依次類推

Year:可出現(xiàn)", - * /"四個字符,有效范圍為1970-2099年

每一個域都使用數(shù)字,但還可以出現(xiàn)如下特殊字符,它們的含義是:

(1):表示匹配該域的任意值,假如在Minutes域使用, 即表示每分鐘都會觸發(fā)事件。

(2)?:只能用在DayofMonth和DayofWeek兩個域。它也匹配域的任意值,但實際不會。因為DayofMonth和 DayofWeek會相互影響。例如想在每月的20日觸發(fā)調(diào)度,不管20日到底是星期幾,則只能使用如下寫法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期幾都會觸發(fā),實際上并不是這樣。

(3)-:表示范圍,例如在Minutes域使用5-20,表示從5分到20分鐘每分鐘觸發(fā)一次

(4)/:表示起始時間開始觸發(fā),然后每隔固定時間觸發(fā)一次,例如在Minutes域使用5/20,則意味著5分鐘觸發(fā)一次,而25,45等分別觸發(fā)一次.

(5),:表示列出枚舉值值。例如:在Minutes域使用5,20,則意味著在5和20分每分鐘觸發(fā)一次。

(6)L:表示最后,只能出現(xiàn)在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味著在最后的一個星期四觸發(fā)。

(7)W: 表示有效工作日(周一到周五),只能出現(xiàn)在DayofMonth域,系統(tǒng)將在離指定日期的最近的有效工作日觸發(fā)事件。例如:在 DayofMonth使用5W,如果5日是星期六,則將在最近的工作日:星期五,即4日觸發(fā)。如果5日是星期天,則在6日(周一)觸發(fā);如果5日在星期一 到星期五中的一天,則就在5日觸發(fā)。另外一點,W的最近尋找不會跨過月份

(8)LW:這兩個字符可以連用,表示在某個月最后一個工作日,即最后一個星期五。

(9)#:用于確定每個月第幾個星期幾,只能出現(xiàn)在DayofMonth域。例如在4#2,表示某月的第二個星期三。

舉幾個例子:

0 0 2 1 * ? * 表示在每月的1日的凌晨2點調(diào)度任務

0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15執(zhí)行作業(yè)

0 15 10 ? 6L 2002-2006 表示2002-2006年的每個月的最后一個星期五上午10:15執(zhí)行作

一個cron表達式有至少6個(也可能7個)有空格分隔的時間元素。

按順序依次為

秒(0~59)

分鐘(0~59)

小時(0~23)

天(月)(0~31,但是你需要考慮你月的天數(shù))

月(0~11)

天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,F(xiàn)RI,SAT)

年份(1970-2099)

其中每個元素可以是一個值(如6),一個連續(xù)區(qū)間(9-12),一個間隔時間(8-18/4)(/表示每隔4小時),一個列表(1,3,5),通配符。由于"月份中的日期"和"星期中的日期"這兩個元素互斥的,必須要對其中一個設置?

0 0 10,14,16 * * ? 每天上午10點,下午2點,4點

0 0/30 9-17 * * ? 朝九晚五工作時間內(nèi)每半小時

0 0 12 ? * WED 表示每個星期三中午12點

“0 0 12 * * ?” 每天中午12點觸發(fā)

“0 15 10 ? * *” 每天上午10:15觸發(fā)

“0 15 10 * * ?” 每天上午10:15觸發(fā)

“0 15 10 * * ? *” 每天上午10:15觸發(fā)

“0 15 10 * * ? 2005” 2005年的每天上午10:15觸發(fā)

“0 * 14 * * ?” 在每天下午2點到下午2:59期間的每1分鐘觸發(fā)

“0 0/5 14 * * ?” 在每天下午2點到下午2:55期間的每5分鐘觸發(fā)

“0 0/5 14,18 * * ?” 在每天下午2點到2:55期間和下午6點到6:55期間的每5分鐘觸發(fā)

“0 0-5 14 * * ?” 在每天下午2點到下午2:05期間的每1分鐘觸發(fā)

“0 10,44 14 ? 3 WED” 每年三月的星期三的下午2:10和2:44觸發(fā)

“0 15 10 ? * MON-FRI” 周一至周五的上午10:15觸發(fā)

“0 15 10 15 * ?” 每月15日上午10:15觸發(fā)

“0 15 10 L * ?” 每月最后一日的上午10:15觸發(fā)

“0 15 10 ? * 6L” 每月的最后一個星期五上午10:15觸發(fā)

“0 15 10 ? * 6L 2002-2005” 2002年至2005年的每月的最后一個星期五上午10:15觸發(fā)

“0 15 10 ? * 6#3” 每月的第三個星期五上午10:15觸發(fā)

監(jiān)聽器

可以使用監(jiān)聽器來處理消息接收

consumer.setMessageListener(new MyListener());

需要實現(xiàn)接口MessageListener

public class MyListener implements MessageListener {

	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		TextMessage textMessage = (TextMessage)message;
		try {
			System.out.println("xxoo" + textMessage.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

當收到消息后會調(diào)起onMessage方法文章來源地址http://www.zghlxwxcb.cn/news/detail-853379.html

消息過濾

消息發(fā)送

		MapMessage msg1 = session.createMapMessage();
		msg1.setString("name", "qiqi");
		msg1.setString("age", "18");
		
		msg1.setStringProperty("name", "qiqi");
		msg1.setIntProperty("age", 18);
		MapMessage msg2 = session.createMapMessage();
		msg2.setString("name", "lucy");
		msg2.setString("age", "18");
		msg2.setStringProperty("name", "lucy");
		msg2.setIntProperty("age", 18);
		MapMessage msg3 = session.createMapMessage();
		msg3.setString("name", "qianqian");
		msg3.setString("age", "17");
		msg3.setStringProperty("name", "qianqian");
		msg3.setIntProperty("age", 17);

消息接收

	
		String selector1 = "age > 17";
		String selector2 = "name = 'lucy'";
		MessageConsumer consumer = session.createConsumer(queue,selector2);

到了這里,關(guān)于ActiveMQ 02 常用API的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • ActiveMq學習⑦__ActiveMq協(xié)議

    ActiveMq學習⑦__ActiveMq協(xié)議

    問題一、默認的61616端口如何更改? 問題二、你生產(chǎn)上的鏈接協(xié)議如何配置的?使用tcp嗎? ActiveMQ 支持的client-broker 通訊協(xié)議有:TVP、NIO、UDP、SSL、Http(s)、VM。 其中配置TransportConnector 的文件在ActiveMQ 安裝目錄的conf/activemq.xml 中的標簽之內(nèi)。 activemq 傳輸協(xié)議的官方文檔:htt

    2024年02月05日
    瀏覽(20)
  • docker-compose安裝和使用(自啟、redis、mysql、rabbitmq、activemq、es、nginx、java應用)

    1.在線安裝docker-compose: 參考官網(wǎng):https://docs.docker.com/compose/install/other/ docker-compose安裝及簡單入門 [Docker] docker-compose使用教程 Docker系列教程22-docker-compose.yml常用命令 2、離線安裝docker-compose: 參考:Docker - 離線安裝 docker-compose(以CentOS系統(tǒng)為例) (1)首先訪問 docker-compose 的

    2024年02月05日
    瀏覽(30)
  • ActiveMq學習⑨__基于zookeeper和LevelDB搭建ActiveMQ集群

    ActiveMq學習⑨__基于zookeeper和LevelDB搭建ActiveMQ集群

    引入消息中間件后如何保證其高可用? 基于zookeeper和LevelDB搭建ActiveMQ集群。集群僅 提供主備方式的高可用集群功能,避免單點故障 。 http://activemq.apache.org/masterslave LevelDB,5.6版本之后推出了LecelDB的持久化引擎,它使用了自定義的索引代替常用的BTree索引,其持久化性能高于

    2024年02月05日
    瀏覽(19)
  • 記錄一次老服務器啟動ActiveMq時報的Could not create the Java Virtual Machine.錯誤

    記錄一次老服務器啟動ActiveMq時報的Could not create the Java Virtual Machine.錯誤

    服務器系統(tǒng)CentOS7? 1、出現(xiàn)ActiveMq服務無法連接 2、查看activemq狀態(tài) service activemq status 顯示activemq not running 3、找到ActiveMq的bin目錄,# 后臺啟動 ./activemq console 提示Could not create the Java Virtual Machine.錯誤 可以判斷是java運行環(huán)境的問題 4、再看看java版本 java -version 5、再看看activemq版

    2024年04月22日
    瀏覽(38)
  • 【ActiveMQ】Failed to start Apache ActiveMQ (localhost, ID_XXX)

    在嘗試使用\\\"binwin64activemq.bat\\\"啟動apache-activemq-5.18.2時,出現(xiàn)了以下錯誤: 錯誤原因是由于ActiveMQ無法將mqtt://0.0.0.0:1883端口綁定,因為該端口已經(jīng)被其他進程占用。但是在命令行中輸入以下命令并沒有返回結(jié)果: 解決方法是修改 confactivemq.xml 文件,找到以下部分: 將端口號

    2024年02月10日
    瀏覽(18)
  • python接收activemq服務器的消息,轉(zhuǎn)發(fā)到另外兩個activemq服務器消息中

    要使用Python接收ActiveMQ服務器的消息并將其轉(zhuǎn)發(fā)到另外兩個ActiveMQ服務器,您可以使用Python的pika庫。pika是一個流行的AMQP(高級消息隊列協(xié)議)客戶端庫,可以與ActiveMQ等消息代理進行交互。 以下是一個簡單的示例,演示如何使用pika從ActiveMQ服務器接收消息,并將其轉(zhuǎn)發(fā)到另外

    2024年01月19日
    瀏覽(21)
  • 七、ActiveMQ的傳輸協(xié)議

    七、ActiveMQ的傳輸協(xié)議

    官網(wǎng)地址:http://activemq.apache.org/configuring-version-5-transports.html ActiveMQ支持的client-broker通訊協(xié)議有:TVP、NIO、UDP、SSL、Http(s)、VM。 其中配置Transport Connector的文件在ActiveMQ安裝目錄的conf/activemq.xml中的transportConnectors標簽之內(nèi)。 URI描述信息的頭部都是采用協(xié)議名稱,唯獨在進行op

    2024年02月19日
    瀏覽(19)
  • Activemq存儲KahaDb詳解

    Activemq存儲KahaDb詳解

    ActiveMQ在不提供持久化的情況下,數(shù)據(jù)保存在內(nèi)存中,一旦應用崩潰或者重啟之后,數(shù)據(jù)都將會丟失,這顯然在大部分情況下是我們所不希望的。對此ActiveMQ提供了兩種持久化方式以供選擇。 kahaDB是一個基于文件,支持事務的、可靠,高性能,可擴展的消息存儲器,目前是a

    2024年02月04日
    瀏覽(18)
  • ActiveMQ面試題(二)

    死信隊列 ActiveMQ 中的消息重發(fā)時間間隔和重發(fā)次數(shù)嗎? 如果你想在消息處理失敗后,不被服務器刪除,還能被其他消費者處理或重試,可以關(guān)閉AUTO_ACKNOWLEDGE,將 ack 交由程序自己處理。那如果使用了 AUTO_ACKNOWLEDGE,消息是什么時候被確認的,還有沒有阻止消息確認的方法?有

    2024年02月07日
    瀏覽(25)
  • 什么是 Apache ActiveMQ?

    ActiveMQ 是一種流行的消息傳遞服務,可促進企業(yè)系統(tǒng)中大規(guī)模的不同數(shù)據(jù)。在本 ActiveMQ 教程中,我們概述了 ActiveMQ、它的優(yōu)點、它的工作原理以及何時應該使用它。 什么是 ActiveMQ? ActiveMQ 是一種流行的開源消息傳遞服務,它構(gòu)建在 Java 之上。它用作面向消息的中間件 ( MoM

    2024年02月07日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包