Active MQ 02
常用API
事務
session.commit();
session.rollback();
用來提交/回滾事務
Purge
清理消息
簽收模式
簽收代表接收端的session已收到消息的一次確認,反饋給broker
ActiveMQ支持自動簽收與手動簽收
Session.AUTO_ACKNOWLEDGE
當客戶端從receiver或onMessage成功返回時,Session自動簽收客戶端的這條消息的收條。
Session.CLIENT_ACKNOWLEDGE
客戶端通過調(diào)用消息(Message)的acknowledge方法簽收消息。在這種情況下,簽收發(fā)生在Session層面:簽收一個已經(jīng)消費的消息會自動地簽收這個Session所有已消費的收條。
Session.DUPS_OK_ACKNOWLEDGE
Session不必確保對傳送消息的簽收,這個模式可能會引起消息的重復,但是降低了Session的開銷,所以只有客戶端能容忍重復的消息,才可使用。
持久化
默認持久化是開啟的
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
優(yōu)先級
可以打亂消費順序
producer.setPriority
配置文件需要指定使用優(yōu)先級的目的地
<policyEntry queue="queue1" prioritizedMessages="true" />
消息超時/過期
producer.setTimeToLive
設置了消息超時的消息,消費端在超時后無法在消費到此消息。
給消息設置一個超時時間 -> 死信隊列 -> 拿出來 -> 重發(fā)
死信
此類消息會進入到ActiveMQ.DLQ
隊列且不會自動清除,稱為死信
此處有消息堆積的風險
修改死信隊列名稱
<policyEntry queue="f" prioritizedMessages="true" >
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
useQueueForQueueMessages: 設置使用隊列保存死信,還可以設置useQueueForTopicMessages,使用Topic來保存死信
讓非持久化的消息也進入死信隊列
<individualDeadLetterStrategy queuePrefix="DLxxQ." useQueueForQueueMessages="true" processNonPersistent="true" />
processNonPersistent=“true”
過期消息不進死信隊列
<individualDeadLetterStrategy processExpired="false" />
獨占消費者
Queue queue = session.createQueue("xxoo?consumer.exclusive=true");
還可以設置優(yōu)先級
Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");
消息類型
object
發(fā)送端
Girl girl = new Girl("qiqi",25,398.0);
Message message = session.createObjectMessage(girl);
接受端
if(message instanceof ActiveMQObjectMessage) {
Girl girl = (Girl)((ActiveMQObjectMessage)message).getObject();
System.out.println(girl);
System.out.println(girl.getName());
}
如果遇到此類報錯
Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)
at com.mashibing.mq.Receiver.main(Receiver.java:65)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)
... 1 more
需要添加信任
connectionFactory.setTrustedPackages(
new ArrayList<String>(
Arrays.asList(
new String[]{
Girl.class.getPackage().getName()
}
)
)
);
bytesMessage
發(fā)送端
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("str".getBytes());
bytesMessage.writeUTF("哈哈");
接受端
if(message instanceof BytesMessage) {
BytesMessage bm = (BytesMessage)message;
byte[] b = new byte[1024];
int len = -1;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}
還可以使用ActiveMQ給提供的便捷方法,但要注意讀取和寫入的順序
bm.readBoolean()
bm.readUTF()
寫入文件
FileOutputStream out = null;
try {
out = new FileOutputStream("d:/aa.txt");
} catch (FileNotFoundException e2) {
e2.printStackTrace();
}
byte[] by = new byte[1024];
int len = 0 ;
try {
while((len = bm.readBytes(by))!= -1){
out.write(by,0,len);
}
} catch (Exception e1) {
e1.printStackTrace();
}
MapMessage
發(fā)送端
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","lucy");
mapMessage.setBoolean("yihun",false);
mapMessage.setInt("age", 17);
producer.send(mapMessage);
接收端
Message message = consumer.receive();
MapMessage mes = (MapMessage) message;
System.out.println(mes);
System.out.println(mes.getString("name"));
消息發(fā)送原理
同步與異步
開啟事務 | 關(guān)閉事務 | |
---|---|---|
持久化 | 異步 | 同步 |
非持久化 | 異步 | 異步 |
我們可以通過以下幾種方式來設置異步發(fā)送:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:61616"
);
// 2.獲取一個向ActiveMQ的連接
connectionFactory.setUseAsyncSend(true);
ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setUseAsyncSend(true);
消息堆積
producer每發(fā)送一個消息,統(tǒng)計一下發(fā)送的字節(jié)數(shù),當字節(jié)數(shù)達到ProducerWindowSize值時,需要等待broker的確認,才能繼續(xù)發(fā)送。
brokerUrl中設置: tcp://localhost:61616?jms.producerWindowSize=1048576
destinationUri中設置: myQueue?producer.windowSize=1048576
延遲消息投遞
首先在配置文件中開啟延遲和調(diào)度
schedulerSupport=“true”
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
延遲發(fā)送
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);
帶間隔的重復發(fā)送
long delay = 10 * 1000;
long period = 2 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
createProducer.send(message);
Cron表達式定時發(fā)送
Cron表達式是一個字符串,字符串以5或6個空格隔開,分為6或7個域,每一個域代表一個含義,Cron有如下兩種語法格式:
Seconds Minutes Hours DayofMonth Month DayofWeek Year或
Seconds Minutes Hours DayofMonth Month DayofWeek
每一個域可出現(xiàn)的字符如下:
Seconds:可出現(xiàn)", - * /"四個字符,有效范圍為0-59的整數(shù)
Minutes:可出現(xiàn)", - * /"四個字符,有效范圍為0-59的整數(shù)
Hours:可出現(xiàn)", - * /"四個字符,有效范圍為0-23的整數(shù)
DayofMonth:可出現(xiàn)", - * / ? L W C"八個字符,有效范圍為0-31的整數(shù)
Month:可出現(xiàn)", - * /"四個字符,有效范圍為1-12的整數(shù)或JAN-DEc
DayofWeek:可出現(xiàn)", - * / ? L C #"四個字符,有效范圍為1-7的整數(shù)或SUN-SAT兩個范圍。1表示星期天,2表示星期一, 依次類推
Year:可出現(xiàn)", - * /"四個字符,有效范圍為1970-2099年
每一個域都使用數(shù)字,但還可以出現(xiàn)如下特殊字符,它們的含義是:
(1):表示匹配該域的任意值,假如在Minutes域使用, 即表示每分鐘都會觸發(fā)事件。
(2)?:只能用在DayofMonth和DayofWeek兩個域。它也匹配域的任意值,但實際不會。因為DayofMonth和 DayofWeek會相互影響。例如想在每月的20日觸發(fā)調(diào)度,不管20日到底是星期幾,則只能使用如下寫法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期幾都會觸發(fā),實際上并不是這樣。
(3)-:表示范圍,例如在Minutes域使用5-20,表示從5分到20分鐘每分鐘觸發(fā)一次
(4)/:表示起始時間開始觸發(fā),然后每隔固定時間觸發(fā)一次,例如在Minutes域使用5/20,則意味著5分鐘觸發(fā)一次,而25,45等分別觸發(fā)一次.
(5),:表示列出枚舉值值。例如:在Minutes域使用5,20,則意味著在5和20分每分鐘觸發(fā)一次。
(6)L:表示最后,只能出現(xiàn)在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味著在最后的一個星期四觸發(fā)。
(7)W: 表示有效工作日(周一到周五),只能出現(xiàn)在DayofMonth域,系統(tǒng)將在離指定日期的最近的有效工作日觸發(fā)事件。例如:在 DayofMonth使用5W,如果5日是星期六,則將在最近的工作日:星期五,即4日觸發(fā)。如果5日是星期天,則在6日(周一)觸發(fā);如果5日在星期一 到星期五中的一天,則就在5日觸發(fā)。另外一點,W的最近尋找不會跨過月份
(8)LW:這兩個字符可以連用,表示在某個月最后一個工作日,即最后一個星期五。
(9)#:用于確定每個月第幾個星期幾,只能出現(xiàn)在DayofMonth域。例如在4#2,表示某月的第二個星期三。
舉幾個例子:
0 0 2 1 * ? * 表示在每月的1日的凌晨2點調(diào)度任務
0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15執(zhí)行作業(yè)
0 15 10 ? 6L 2002-2006 表示2002-2006年的每個月的最后一個星期五上午10:15執(zhí)行作
一個cron表達式有至少6個(也可能7個)有空格分隔的時間元素。
按順序依次為
秒(0~59)
分鐘(0~59)
小時(0~23)
天(月)(0~31,但是你需要考慮你月的天數(shù))
月(0~11)
天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,F(xiàn)RI,SAT)
年份(1970-2099)
其中每個元素可以是一個值(如6),一個連續(xù)區(qū)間(9-12),一個間隔時間(8-18/4)(/表示每隔4小時),一個列表(1,3,5),通配符。由于"月份中的日期"和"星期中的日期"這兩個元素互斥的,必須要對其中一個設置?
0 0 10,14,16 * * ? 每天上午10點,下午2點,4點
0 0/30 9-17 * * ? 朝九晚五工作時間內(nèi)每半小時
0 0 12 ? * WED 表示每個星期三中午12點
“0 0 12 * * ?” 每天中午12點觸發(fā)
“0 15 10 ? * *” 每天上午10:15觸發(fā)
“0 15 10 * * ?” 每天上午10:15觸發(fā)
“0 15 10 * * ? *” 每天上午10:15觸發(fā)
“0 15 10 * * ? 2005” 2005年的每天上午10:15觸發(fā)
“0 * 14 * * ?” 在每天下午2點到下午2:59期間的每1分鐘觸發(fā)
“0 0/5 14 * * ?” 在每天下午2點到下午2:55期間的每5分鐘觸發(fā)
“0 0/5 14,18 * * ?” 在每天下午2點到2:55期間和下午6點到6:55期間的每5分鐘觸發(fā)
“0 0-5 14 * * ?” 在每天下午2點到下午2:05期間的每1分鐘觸發(fā)
“0 10,44 14 ? 3 WED” 每年三月的星期三的下午2:10和2:44觸發(fā)
“0 15 10 ? * MON-FRI” 周一至周五的上午10:15觸發(fā)
“0 15 10 15 * ?” 每月15日上午10:15觸發(fā)
“0 15 10 L * ?” 每月最后一日的上午10:15觸發(fā)
“0 15 10 ? * 6L” 每月的最后一個星期五上午10:15觸發(fā)
“0 15 10 ? * 6L 2002-2005” 2002年至2005年的每月的最后一個星期五上午10:15觸發(fā)
“0 15 10 ? * 6#3” 每月的第三個星期五上午10:15觸發(fā)
監(jiān)聽器
可以使用監(jiān)聽器來處理消息接收
consumer.setMessageListener(new MyListener());
需要實現(xiàn)接口MessageListener文章來源:http://www.zghlxwxcb.cn/news/detail-853379.html
public class MyListener implements MessageListener {
public void onMessage(Message message) {
// TODO Auto-generated method stub
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("xxoo" + textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
當收到消息后會調(diào)起onMessage方法文章來源地址http://www.zghlxwxcb.cn/news/detail-853379.html
消息過濾
消息發(fā)送
MapMessage msg1 = session.createMapMessage();
msg1.setString("name", "qiqi");
msg1.setString("age", "18");
msg1.setStringProperty("name", "qiqi");
msg1.setIntProperty("age", 18);
MapMessage msg2 = session.createMapMessage();
msg2.setString("name", "lucy");
msg2.setString("age", "18");
msg2.setStringProperty("name", "lucy");
msg2.setIntProperty("age", 18);
MapMessage msg3 = session.createMapMessage();
msg3.setString("name", "qianqian");
msg3.setString("age", "17");
msg3.setStringProperty("name", "qianqian");
msg3.setIntProperty("age", 17);
消息接收
String selector1 = "age > 17";
String selector2 = "name = 'lucy'";
MessageConsumer consumer = session.createConsumer(queue,selector2);
到了這里,關(guān)于ActiveMQ 02 常用API的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!